Skip to content

Commit

Permalink
feat(query): Procedure support drop if exists and create or replace (#โ€ฆ
Browse files Browse the repository at this point in the history
โ€ฆ16500)

* feat(query): support drop procedure if exists

* support create or replace procedure

* fix conversation

* refactor: support create_option and drop if exisits

* add_procedure return Result<std::result::Result<CreateProcedureReply, KVAppError>>
  • Loading branch information
TCeason authored Sep 26, 2024
1 parent 99e039f commit 7e14ac0
Show file tree
Hide file tree
Showing 12 changed files with 275 additions and 60 deletions.
14 changes: 2 additions & 12 deletions src/meta/app/src/principal/procedure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use databend_common_expression::types::DataType;
use crate::principal::procedure_id_ident::ProcedureIdIdent;
use crate::principal::procedure_name_ident::ProcedureNameIdent;
use crate::principal::ProcedureIdentity;
use crate::schema::CreateOption;
use crate::tenant::Tenant;
use crate::tenant::ToTenant;
use crate::KeyWithTenant;
Expand Down Expand Up @@ -85,22 +84,15 @@ impl Display for ProcedureMeta {

#[derive(Clone, Debug, PartialEq)]
pub struct CreateProcedureReq {
pub create_option: CreateOption,
pub name_ident: ProcedureNameIdent,
pub meta: ProcedureMeta,
}

impl Display for CreateProcedureReq {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
let typ = match self.create_option {
CreateOption::Create => "create_procedure",
CreateOption::CreateIfNotExists => "create_procedure_if_not_exists",
CreateOption::CreateOrReplace => "create_or_replace_procedure",
};
write!(
f,
"{}:{}/{}={:?}",
typ,
"{}/{}={:?}",
self.name_ident.tenant_name(),
self.name_ident.procedure_name(),
self.meta
Expand Down Expand Up @@ -137,16 +129,14 @@ pub struct RenameProcedureReply {}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct DropProcedureReq {
pub if_exists: bool,
pub name_ident: ProcedureNameIdent,
}

impl Display for DropProcedureReq {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
write!(
f,
"drop_procedure(if_exists={}):{}/{}",
self.if_exists,
"drop_procedure:{}/{}",
self.name_ident.tenant_name(),
self.name_ident.procedure_name(),
)
Expand Down
13 changes: 11 additions & 2 deletions src/query/ast/src/ast/statements/procedure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,11 @@ impl Display for CreateProcedureStmt {
if let CreateOption::CreateOrReplace = self.create_option {
write!(f, "OR REPLACE ")?;
}
write!(f, "PROCEDURE {}", self.name.name)?;
write!(f, "PROCEDURE ")?;
if let CreateOption::CreateIfNotExists = self.create_option {
write!(f, "IF NOT EXISTS ")?;
}
write!(f, "{}", self.name.name)?;
if let Some(args) = &self.args {
if args.is_empty() {
write!(f, "() ")?;
Expand Down Expand Up @@ -137,12 +141,17 @@ impl Display for CreateProcedureStmt {

#[derive(Debug, Clone, PartialEq, Drive, DriveMut)]
pub struct DropProcedureStmt {
pub if_exists: bool,
pub name: ProcedureIdentity,
}

impl Display for DropProcedureStmt {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
write!(f, "DROP PROCEDURE {}", self.name)?;
write!(f, "DROP PROCEDURE ")?;
if self.if_exists {
write!(f, "IF EXISTS ")?;
}
write!(f, "{}", self.name)?;

Ok(())
}
Expand Down
26 changes: 21 additions & 5 deletions src/query/ast/src/parser/statement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2067,10 +2067,25 @@ pub fn statement_body(i: Input) -> IResult<Statement> {
// [ COMMENT = '<string_literal>' ] AS <procedure_definition>
let create_procedure = map_res(
rule! {
CREATE ~ ( OR ~ ^REPLACE )? ~ PROCEDURE ~ #ident ~ #procedure_arg ~ RETURNS ~ #procedure_return ~ LANGUAGE ~ SQL ~ (COMMENT ~ "=" ~ #literal_string)? ~ AS ~ #code_string
CREATE ~ ( OR ~ ^REPLACE )? ~ PROCEDURE ~ ( IF ~ ^NOT ~ ^EXISTS )? ~ #ident ~ #procedure_arg ~ RETURNS ~ #procedure_return ~ LANGUAGE ~ SQL ~ (COMMENT ~ "=" ~ #literal_string)? ~ AS ~ #code_string
},
|(_, opt_or_replace, _, name, args, _, return_type, _, _, opt_comment, _, script)| {
let create_option = parse_create_option(opt_or_replace.is_some(), false)?;
|(
_,
opt_or_replace,
_,
opt_if_not_exists,
name,
args,
_,
return_type,
_,
_,
opt_comment,
_,
script,
)| {
let create_option =
parse_create_option(opt_or_replace.is_some(), opt_if_not_exists.is_some())?;

let name = ProcedureIdentity {
name: name.to_string(),
Expand Down Expand Up @@ -2137,10 +2152,11 @@ pub fn statement_body(i: Input) -> IResult<Statement> {

let drop_procedure = map(
rule! {
DROP ~ PROCEDURE ~ #ident ~ #procedure_type_name
DROP ~ PROCEDURE ~ ( IF ~ ^EXISTS )? ~ #ident ~ #procedure_type_name
},
|(_, _, name, args)| {
|(_, _, opt_if_exists, name, args)| {
Statement::DropProcedure(DropProcedureStmt {
if_exists: opt_if_exists.is_some(),
name: ProcedureIdentity {
name: name.to_string(),
args_type: if args.is_empty() {
Expand Down
19 changes: 19 additions & 0 deletions src/query/ast/tests/it/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -822,10 +822,29 @@ fn test_statement() {
r#"describe PROCEDURE p1()"#,
r#"describe PROCEDURE p1(string, timestamp)"#,
r#"drop PROCEDURE p1()"#,
r#"drop PROCEDURE if exists p1()"#,
r#"drop PROCEDURE p1(int, string)"#,
r#"call PROCEDURE p1()"#,
r#"call PROCEDURE p1(1, 'x', '2022-02-02'::Date)"#,
r#"show PROCEDURES like 'p1%'"#,
r#"create or replace PROCEDURE p1() returns string not null language sql comment = 'test' as $$
BEGIN
LET sum := 0;
FOR x IN SELECT * FROM numbers(100) DO
sum := sum + x.number;
END FOR;
RETURN sum;
END;
$$;"#,
r#"create PROCEDURE if not exists p1() returns string not null language sql comment = 'test' as $$
BEGIN
LET sum := 0;
FOR x IN SELECT * FROM numbers(100) DO
sum := sum + x.number;
END FOR;
RETURN sum;
END;
$$;"#,
r#"create PROCEDURE p1() returns string not null language sql comment = 'test' as $$
BEGIN
LET sum := 0;
Expand Down
110 changes: 110 additions & 0 deletions src/query/ast/tests/it/testdata/stmt.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23030,6 +23030,23 @@ DROP PROCEDURE p1()
---------- AST ------------
DropProcedure(
DropProcedureStmt {
if_exists: false,
name: ProcedureIdentity {
name: "p1",
args_type: "",
},
},
)


---------- Input ----------
drop PROCEDURE if exists p1()
---------- Output ---------
DROP PROCEDURE IF EXISTS p1()
---------- AST ------------
DropProcedure(
DropProcedureStmt {
if_exists: true,
name: ProcedureIdentity {
name: "p1",
args_type: "",
Expand All @@ -23045,6 +23062,7 @@ DROP PROCEDURE p1(Int32,STRING)
---------- AST ------------
DropProcedure(
DropProcedureStmt {
if_exists: false,
name: ProcedureIdentity {
name: "p1",
args_type: "Int32,STRING",
Expand Down Expand Up @@ -23130,6 +23148,98 @@ ShowProcedures {
}


---------- Input ----------
create or replace PROCEDURE p1() returns string not null language sql comment = 'test' as $$
BEGIN
LET sum := 0;
FOR x IN SELECT * FROM numbers(100) DO
sum := sum + x.number;
END FOR;
RETURN sum;
END;
$$;
---------- Output ---------
CREATE OR REPLACE PROCEDURE p1() RETURNS STRING NOT NULL LANGUAGE SQL COMMENT='test' AS $$
BEGIN
LET sum := 0;
FOR x IN SELECT * FROM numbers(100) DO
sum := sum + x.number;
END FOR;
RETURN sum;
END;
$$
---------- AST ------------
CreateProcedure(
CreateProcedureStmt {
create_option: CreateOrReplace,
name: ProcedureIdentity {
name: "p1",
args_type: "",
},
language: SQL,
args: None,
return_type: [
ProcedureType {
name: None,
data_type: NotNull(
String,
),
},
],
comment: Some(
"test",
),
script: "BEGIN\n LET sum := 0;\n FOR x IN SELECT * FROM numbers(100) DO\n sum := sum + x.number;\n END FOR;\n RETURN sum;\nEND;",
},
)


---------- Input ----------
create PROCEDURE if not exists p1() returns string not null language sql comment = 'test' as $$
BEGIN
LET sum := 0;
FOR x IN SELECT * FROM numbers(100) DO
sum := sum + x.number;
END FOR;
RETURN sum;
END;
$$;
---------- Output ---------
CREATE PROCEDURE IF NOT EXISTS p1() RETURNS STRING NOT NULL LANGUAGE SQL COMMENT='test' AS $$
BEGIN
LET sum := 0;
FOR x IN SELECT * FROM numbers(100) DO
sum := sum + x.number;
END FOR;
RETURN sum;
END;
$$
---------- AST ------------
CreateProcedure(
CreateProcedureStmt {
create_option: CreateIfNotExists,
name: ProcedureIdentity {
name: "p1",
args_type: "",
},
language: SQL,
args: None,
return_type: [
ProcedureType {
name: None,
data_type: NotNull(
String,
),
},
],
comment: Some(
"test",
),
script: "BEGIN\n LET sum := 0;\n FOR x IN SELECT * FROM numbers(100) DO\n sum := sum + x.number;\n END FOR;\n RETURN sum;\nEND;",
},
)


---------- Input ----------
create PROCEDURE p1() returns string not null language sql comment = 'test' as $$
BEGIN
Expand Down
25 changes: 5 additions & 20 deletions src/query/management/src/procedure/procedure_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use databend_common_meta_app::principal::procedure::ProcedureInfo;
use databend_common_meta_app::principal::procedure_id_ident::ProcedureIdIdent;
use databend_common_meta_app::principal::CreateProcedureReply;
use databend_common_meta_app::principal::CreateProcedureReq;
use databend_common_meta_app::principal::DropProcedureReq;
use databend_common_meta_app::principal::GetProcedureReply;
use databend_common_meta_app::principal::GetProcedureReq;
use databend_common_meta_app::principal::ListProcedureReq;
Expand All @@ -31,7 +30,6 @@ use databend_common_meta_app::principal::ProcedureIdToNameIdent;
use databend_common_meta_app::principal::ProcedureIdentity;
use databend_common_meta_app::principal::ProcedureMeta;
use databend_common_meta_app::principal::ProcedureNameIdent;
use databend_common_meta_app::schema::CreateOption;
use databend_common_meta_app::KeyWithTenant;
use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::DirName;
Expand All @@ -55,11 +53,11 @@ impl ProcedureMgr {
pub async fn create_procedure(
&self,
req: CreateProcedureReq,
overriding: bool,
) -> Result<CreateProcedureReply, KVAppError> {
debug!(req :? =(&req); "SchemaApi: {}", func_name!());
let name_ident = &req.name_ident;
let meta = &req.meta;
let overriding = req.create_option.is_overriding();
let name_ident_raw = serialize_struct(name_ident.procedure_name())?;

let create_res = self
Expand All @@ -74,33 +72,20 @@ impl ProcedureMgr {

match create_res {
Ok(id) => Ok(CreateProcedureReply { procedure_id: *id }),
Err(existent) => match req.create_option {
CreateOption::Create => {
Err(AppError::from(name_ident.exist_error(func_name!())).into())
}
CreateOption::CreateIfNotExists => Ok(CreateProcedureReply {
procedure_id: *existent.data,
}),
CreateOption::CreateOrReplace => {
unreachable!(
"create_procedure: CreateOrReplace should never conflict with existent"
);
}
},
Err(_) => Err(AppError::from(name_ident.exist_error(func_name!())).into()),
}
}

/// Drop the tenant's PROCEDURE by name, return the dropped one or None if nothing is dropped.
#[async_backtrace::framed]
pub async fn drop_procedure(
&self,
req: DropProcedureReq,
name_ident: &ProcedureNameIdent,
) -> Result<Option<(SeqV<ProcedureId>, SeqV<ProcedureMeta>)>, KVAppError> {
debug!(req :? =(&req); "SchemaApi: {}", func_name!());
let name_ident = req.name_ident;
debug!(name_ident :? =(name_ident); "SchemaApi: {}", func_name!());
let dropped = self
.kv_api
.remove_id_value(&name_ident, |id| {
.remove_id_value(name_ident, |id| {
vec![ProcedureIdToNameIdent::new_generic(name_ident.tenant(), id).to_string_key()]
})
.await?;
Expand Down
25 changes: 21 additions & 4 deletions src/query/service/src/interpreters/interpreter_procedure_create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@
use std::sync::Arc;

use databend_common_exception::Result;
use databend_common_meta_api::kv_app_error::KVAppError;
use databend_common_meta_app::app_error::AppError;
use databend_common_meta_app::principal::CreateProcedureReq;
use databend_common_meta_app::schema::CreateOption;
use databend_common_sql::plans::CreateProcedurePlan;
use databend_common_users::UserApiProvider;
use log::debug;
Expand Down Expand Up @@ -55,10 +58,24 @@ impl Interpreter for CreateProcedureInterpreter {
let tenant = self.plan.tenant.clone();

let create_procedure_req: CreateProcedureReq = self.plan.clone().into();
let _ = UserApiProvider::instance()
.add_procedure(&tenant, create_procedure_req)
.await?;
let overriding = self.plan.create_option.is_overriding();

Ok(PipelineBuildResult::create())
return if let Err(e) = UserApiProvider::instance()
.add_procedure(&tenant, create_procedure_req, overriding)
.await?
{
match e {
KVAppError::AppError(AppError::ProcedureAlreadyExists(_)) => {
if self.plan.create_option != CreateOption::CreateIfNotExists {
Err(e.into())
} else {
Ok(PipelineBuildResult::create())
}
}
_ => Err(e.into()),
}
} else {
Ok(PipelineBuildResult::create())
};
}
}
Loading

0 comments on commit 7e14ac0

Please sign in to comment.