Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
TCeason committed Sep 25, 2024
1 parent f4b3ec6 commit 1fb3b9a
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 57 deletions.
14 changes: 2 additions & 12 deletions src/meta/app/src/principal/procedure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use databend_common_expression::types::DataType;
use crate::principal::procedure_id_ident::ProcedureIdIdent;
use crate::principal::procedure_name_ident::ProcedureNameIdent;
use crate::principal::ProcedureIdentity;
use crate::schema::CreateOption;
use crate::tenant::Tenant;
use crate::tenant::ToTenant;
use crate::KeyWithTenant;
Expand Down Expand Up @@ -85,22 +84,15 @@ impl Display for ProcedureMeta {

#[derive(Clone, Debug, PartialEq)]
pub struct CreateProcedureReq {
pub create_option: CreateOption,
pub name_ident: ProcedureNameIdent,
pub meta: ProcedureMeta,
}

impl Display for CreateProcedureReq {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
let typ = match self.create_option {
CreateOption::Create => "create_procedure",
CreateOption::CreateIfNotExists => "create_procedure_if_not_exists",
CreateOption::CreateOrReplace => "create_or_replace_procedure",
};
write!(
f,
"{}:{}/{}={:?}",
typ,
"{}/{}={:?}",
self.name_ident.tenant_name(),
self.name_ident.procedure_name(),
self.meta
Expand Down Expand Up @@ -137,16 +129,14 @@ pub struct RenameProcedureReply {}

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct DropProcedureReq {
pub if_exists: bool,
pub name_ident: ProcedureNameIdent,
}

impl Display for DropProcedureReq {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
write!(
f,
"drop_procedure(if_exists={}):{}/{}",
self.if_exists,
"drop_procedure:{}/{}",
self.name_ident.tenant_name(),
self.name_ident.procedure_name(),
)
Expand Down
35 changes: 5 additions & 30 deletions src/query/management/src/procedure/procedure_mgr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ use databend_common_meta_app::principal::procedure::ProcedureInfo;
use databend_common_meta_app::principal::procedure_id_ident::ProcedureIdIdent;
use databend_common_meta_app::principal::CreateProcedureReply;
use databend_common_meta_app::principal::CreateProcedureReq;
use databend_common_meta_app::principal::DropProcedureReq;
use databend_common_meta_app::principal::GetProcedureReply;
use databend_common_meta_app::principal::GetProcedureReq;
use databend_common_meta_app::principal::ListProcedureReq;
Expand All @@ -31,7 +30,6 @@ use databend_common_meta_app::principal::ProcedureIdToNameIdent;
use databend_common_meta_app::principal::ProcedureIdentity;
use databend_common_meta_app::principal::ProcedureMeta;
use databend_common_meta_app::principal::ProcedureNameIdent;
use databend_common_meta_app::schema::CreateOption;
use databend_common_meta_app::KeyWithTenant;
use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::DirName;
Expand All @@ -55,11 +53,11 @@ impl ProcedureMgr {
pub async fn create_procedure(
&self,
req: CreateProcedureReq,
overriding: bool,
) -> Result<CreateProcedureReply, KVAppError> {
debug!(req :? =(&req); "SchemaApi: {}", func_name!());
let name_ident = &req.name_ident;
let meta = &req.meta;
let overriding = req.create_option.is_overriding();
let name_ident_raw = serialize_struct(name_ident.procedure_name())?;

let create_res = self
Expand All @@ -74,46 +72,23 @@ impl ProcedureMgr {

match create_res {
Ok(id) => Ok(CreateProcedureReply { procedure_id: *id }),
Err(existent) => match req.create_option {
CreateOption::Create => {
Err(AppError::from(name_ident.exist_error(func_name!())).into())
}
CreateOption::CreateIfNotExists => Ok(CreateProcedureReply {
procedure_id: *existent.data,
}),
CreateOption::CreateOrReplace => {
let res = self
.kv_api
.update_id_value(name_ident, meta.clone())
.await?;

if let Some((id, _meta)) = res {
Ok(CreateProcedureReply { procedure_id: *id })
} else {
Err(AppError::from(name_ident.unknown_error(func_name!())).into())
}
}
},
Err(_) => Err(AppError::from(name_ident.exist_error(func_name!())).into()),
}
}

/// Drop the tenant's PROCEDURE by name, return the dropped one or None if nothing is dropped.
#[async_backtrace::framed]
pub async fn drop_procedure(
&self,
req: DropProcedureReq,
name_ident: &ProcedureNameIdent,
) -> Result<Option<(SeqV<ProcedureId>, SeqV<ProcedureMeta>)>, KVAppError> {
debug!(req :? =(&req); "SchemaApi: {}", func_name!());
let name_ident = req.name_ident;
debug!(name_ident :? =(name_ident); "SchemaApi: {}", func_name!());
let dropped = self
.kv_api
.remove_id_value(&name_ident, |id| {
.remove_id_value(name_ident, |id| {
vec![ProcedureIdToNameIdent::new_generic(name_ident.tenant(), id).to_string_key()]
})
.await?;
if dropped.is_none() && !req.if_exists {
return Err(AppError::from(name_ident.unknown_error("drop procedure")).into());
}
Ok(dropped)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,11 @@ impl Interpreter for CreateProcedureInterpreter {
let tenant = self.plan.tenant.clone();

let create_procedure_req: CreateProcedureReq = self.plan.clone().into();
let overriding = self.plan.create_option.is_overriding();

let _ = UserApiProvider::instance()
.add_procedure(&tenant, create_procedure_req)
.add_procedure(&tenant, create_procedure_req, overriding)
.await?;

Ok(PipelineBuildResult::create())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl Interpreter for DropProcedureInterpreter {

let drop_procedure_req: DropProcedureReq = self.plan.clone().into();
let _ = UserApiProvider::instance()
.drop_procedure(&tenant, drop_procedure_req)
.drop_procedure(&tenant, drop_procedure_req, self.plan.if_exists)
.await?;

Ok(PipelineBuildResult::create())
Expand Down
8 changes: 1 addition & 7 deletions src/query/sql/src/planner/plans/ddl/procedure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ pub struct CreateProcedurePlan {
impl From<CreateProcedurePlan> for CreateProcedureReq {
fn from(p: CreateProcedurePlan) -> Self {
CreateProcedureReq {
create_option: p.create_option,
name_ident: p.name,
meta: p.meta,
}
Expand All @@ -56,7 +55,6 @@ impl From<CreateProcedurePlan> for CreateProcedureReq {
impl From<&CreateProcedurePlan> for CreateProcedureReq {
fn from(p: &CreateProcedurePlan) -> Self {
CreateProcedureReq {
create_option: p.create_option,
name_ident: p.name.clone(),
meta: p.meta.clone(),
}
Expand All @@ -72,17 +70,13 @@ pub struct DropProcedurePlan {

impl From<DropProcedurePlan> for DropProcedureReq {
fn from(p: DropProcedurePlan) -> Self {
DropProcedureReq {
if_exists: p.if_exists,
name_ident: p.name,
}
DropProcedureReq { name_ident: p.name }
}
}

impl From<&DropProcedurePlan> for DropProcedureReq {
fn from(p: &DropProcedurePlan) -> Self {
DropProcedureReq {
if_exists: p.if_exists,
name_ident: p.name.clone(),
}
}
Expand Down
32 changes: 27 additions & 5 deletions src/query/users/src/user_procedure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use databend_common_meta_app::app_error::AppError;
use databend_common_meta_app::principal::CreateProcedureReply;
use databend_common_meta_app::principal::CreateProcedureReq;
use databend_common_meta_app::principal::DropProcedureReq;
use databend_common_meta_app::principal::GetProcedureReply;
Expand All @@ -26,10 +28,16 @@ use crate::UserApiProvider;
impl UserApiProvider {
// Add a new Procedure.
#[async_backtrace::framed]
pub async fn add_procedure(&self, tenant: &Tenant, req: CreateProcedureReq) -> Result<()> {
pub async fn add_procedure(
&self,
tenant: &Tenant,
req: CreateProcedureReq,
overriding: bool,
) -> Result<CreateProcedureReply> {
let procedure_api = self.procedure_api(tenant);
let _ = procedure_api.create_procedure(req).await?;
Ok(())
let replay = procedure_api.create_procedure(req, overriding).await?;

Ok(replay)
}

#[async_backtrace::framed]
Expand All @@ -48,8 +56,22 @@ impl UserApiProvider {

// Drop a Procedure by name.
#[async_backtrace::framed]
pub async fn drop_procedure(&self, tenant: &Tenant, req: DropProcedureReq) -> Result<()> {
let _ = self.procedure_api(tenant).drop_procedure(req).await?;
pub async fn drop_procedure(
&self,
tenant: &Tenant,
req: DropProcedureReq,
if_exists: bool,
) -> Result<()> {
let dropped = self
.procedure_api(tenant)
.drop_procedure(&req.name_ident)
.await?;
if dropped.is_none() && !if_exists {
return Err(ErrorCode::UnknownProcedure(format!(
"Unknown procedure '{}' while drop procedure",
req.name_ident
)));
}
Ok(())
}
}

0 comments on commit 1fb3b9a

Please sign in to comment.