Skip to content

Commit

Permalink
feat: implement drop table procedure (#1872)
Browse files Browse the repository at this point in the history
* feat: implement drop table procedure

* fix: fix uncaught error

* refacotr: refactor error handling

* chore: apply suggestions from CR

* refactor: move fetch_table/s to table_routes.rs

* chore: fix clippy

* chore: apply suggestions from CR

* chore: rebase onto develop

* feat: compare the table_route value before deleting

* feat: handle if table already exists on datanode

* Update src/meta-srv/src/procedure/drop_table.rs

Co-authored-by: JeremyHi <[email protected]>

---------

Co-authored-by: JeremyHi <[email protected]>
  • Loading branch information
WenyXu and fengjiachun authored Jul 7, 2023
1 parent ad165c1 commit 64acfd3
Show file tree
Hide file tree
Showing 13 changed files with 579 additions and 95 deletions.
2 changes: 1 addition & 1 deletion src/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
mod client;
pub mod client_manager;
mod database;
mod error;
pub mod error;
pub mod load_balance;
mod metrics;
mod stream_insert;
Expand Down
63 changes: 61 additions & 2 deletions src/common/meta/src/rpc/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ use std::result;

use api::v1::meta::submit_ddl_task_request::Task;
use api::v1::meta::{
CreateTableTask as PbCreateTableTask, Partition,
CreateTableTask as PbCreateTableTask, DropTableTask as PbDropTableTask, Partition,
SubmitDdlTaskRequest as PbSubmitDdlTaskRequest,
SubmitDdlTaskResponse as PbSubmitDdlTaskResponse,
};
use api::v1::CreateTableExpr;
use api::v1::{CreateTableExpr, DropTableExpr};
use prost::Message;
use serde::{Deserialize, Serialize};
use snafu::{OptionExt, ResultExt};
Expand All @@ -33,6 +33,7 @@ use crate::table_name::TableName;
#[derive(Debug)]
pub enum DdlTask {
CreateTable(CreateTableTask),
DropTable(DropTableTask),
}

impl DdlTask {
Expand All @@ -52,6 +53,7 @@ impl TryFrom<Task> for DdlTask {
Task::CreateTableTask(create_table) => {
Ok(DdlTask::CreateTable(create_table.try_into()?))
}
Task::DropTableTask(drop_table) => Ok(DdlTask::DropTable(drop_table.try_into()?)),
_ => todo!(),
}
}
Expand All @@ -71,6 +73,15 @@ impl TryFrom<SubmitDdlTaskRequest> for PbSubmitDdlTaskRequest {
create_table: Some(task.create_table),
partitions: task.partitions,
}),

DdlTask::DropTable(task) => Task::DropTableTask(PbDropTableTask {
drop_table: Some(DropTableExpr {
catalog_name: task.catalog,
schema_name: task.schema,
table_name: task.table,
table_id: Some(api::v1::TableId { id: task.table_id }),
}),
}),
};
Ok(Self {
header: None,
Expand Down Expand Up @@ -98,6 +109,54 @@ impl TryFrom<PbSubmitDdlTaskResponse> for SubmitDdlTaskResponse {
}
}

#[derive(Debug, PartialEq, Serialize, Deserialize)]
pub struct DropTableTask {
pub catalog: String,
pub schema: String,
pub table: String,
pub table_id: TableId,
}

impl DropTableTask {
pub fn table_ref(&self) -> TableReference {
TableReference {
catalog: &self.catalog,
schema: &self.schema,
table: &self.table,
}
}

pub fn table_name(&self) -> TableName {
TableName {
catalog_name: self.catalog.to_string(),
schema_name: self.schema.to_string(),
table_name: self.table.to_string(),
}
}
}

impl TryFrom<PbDropTableTask> for DropTableTask {
type Error = error::Error;

fn try_from(pb: PbDropTableTask) -> Result<Self> {
let drop_table = pb.drop_table.context(error::InvalidProtoMsgSnafu {
err_msg: "expected drop table",
})?;

Ok(Self {
catalog: drop_table.catalog_name,
schema: drop_table.schema_name,
table: drop_table.table_name,
table_id: drop_table
.table_id
.context(error::InvalidProtoMsgSnafu {
err_msg: "expected table_id",
})?
.id,
})
}
}

#[derive(Debug, PartialEq)]
pub struct CreateTableTask {
pub create_table: CreateTableExpr,
Expand Down
29 changes: 28 additions & 1 deletion src/meta-srv/src/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
use std::sync::Arc;

use client::client_manager::DatanodeClients;
use common_meta::rpc::ddl::CreateTableTask;
use common_meta::rpc::ddl::{CreateTableTask, DropTableTask};
use common_meta::rpc::router::TableRoute;
use common_procedure::{watcher, ProcedureId, ProcedureManagerRef, ProcedureWithId};
use snafu::ResultExt;

use crate::error::{self, Result};
use crate::procedure::create_table::CreateTableProcedure;
use crate::procedure::drop_table::DropTableProcedure;
use crate::service::mailbox::MailboxRef;
use crate::service::store::kv::KvStoreRef;

pub type DdlManagerRef = Arc<DdlManager>;
Expand All @@ -30,6 +32,8 @@ pub struct DdlManager {
procedure_manager: ProcedureManagerRef,
kv_store: KvStoreRef,
datanode_clients: Arc<DatanodeClients>,
mailbox: MailboxRef,
server_addr: String,
}

// TODO(weny): removes in following PRs.
Expand All @@ -38,25 +42,33 @@ pub struct DdlManager {
pub(crate) struct DdlContext {
pub(crate) kv_store: KvStoreRef,
pub(crate) datanode_clients: Arc<DatanodeClients>,
pub(crate) mailbox: MailboxRef,
pub(crate) server_addr: String,
}

impl DdlManager {
pub(crate) fn new(
procedure_manager: ProcedureManagerRef,
kv_store: KvStoreRef,
datanode_clients: Arc<DatanodeClients>,
mailbox: MailboxRef,
server_addr: String,
) -> Self {
Self {
procedure_manager,
kv_store,
datanode_clients,
mailbox,
server_addr,
}
}

pub(crate) fn create_context(&self) -> DdlContext {
DdlContext {
kv_store: self.kv_store.clone(),
datanode_clients: self.datanode_clients.clone(),
mailbox: self.mailbox.clone(),
server_addr: self.server_addr.clone(),
}
}

Expand Down Expand Up @@ -92,6 +104,21 @@ impl DdlManager {
self.submit_procedure(procedure_with_id).await
}

pub async fn submit_drop_table_task(
&self,
cluster_id: u64,
drop_table_task: DropTableTask,
table_route: TableRoute,
) -> Result<ProcedureId> {
let context = self.create_context();

let procedure = DropTableProcedure::new(cluster_id, drop_table_task, table_route, context);

let procedure_with_id = ProcedureWithId::with_random_id(Box::new(procedure));

self.submit_procedure(procedure_with_id).await
}

async fn submit_procedure(&self, procedure_with_id: ProcedureWithId) -> Result<ProcedureId> {
let procedure_id = procedure_with_id.id;

Expand Down
16 changes: 8 additions & 8 deletions src/meta-srv/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,21 @@

use common_error::prelude::*;
use common_meta::peer::Peer;
use common_runtime::JoinError;
use snafu::Location;
use tokio::sync::mpsc::error::SendError;
use tokio::sync::oneshot::error::TryRecvError;
use tonic::codegen::http;
use tonic::Code;

#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
#[snafu(display("Failed to join a future: {}", source))]
Join {
location: Location,
source: JoinError,
},

#[snafu(display("Failed to execute transaction: {}", msg))]
Txn { location: Location, msg: String },

Expand All @@ -37,12 +43,6 @@ pub enum Error {
found: u64,
},

#[snafu(display("Failed to receive status, source: {}", source,))]
TryReceiveStatus {
location: Location,
source: TryRecvError,
},

#[snafu(display(
"Failed to request Datanode, expected: {}, but only {} available",
expected,
Expand Down Expand Up @@ -467,7 +467,7 @@ impl ErrorExt for Error {
| Error::StartGrpc { .. }
| Error::Combine { .. }
| Error::NoEnoughAvailableDatanode { .. }
| Error::TryReceiveStatus { .. } => StatusCode::Internal,
| Error::Join { .. } => StatusCode::Internal,
Error::EmptyKey { .. }
| Error::MissingRequiredParameter { .. }
| Error::MissingRequestHeader { .. }
Expand Down
1 change: 1 addition & 0 deletions src/meta-srv/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#![feature(async_closure)]
#![feature(btree_drain_filter)]
#![feature(result_flattening)]

pub mod bootstrap;
pub mod cluster;
Expand Down
20 changes: 11 additions & 9 deletions src/meta-srv/src/metasrv/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,17 @@ impl MetaSrvBuilder {
.unwrap_or_else(|| Arc::new(DefaultMetadataService::new(kv_store.clone())));
let lock = lock.unwrap_or_else(|| Arc::new(MemLock::default()));

// TODO(weny): considers to modify the default config of procedure manager
let ddl_manager = Arc::new(DdlManager::new(
procedure_manager.clone(),
kv_store.clone(),
Arc::new(DatanodeClients::default()),
mailbox.clone(),
options.server_addr.clone(),
));

let _ = ddl_manager.try_start();

let handler_group = match handler_group {
Some(handler_group) => handler_group,
None => {
Expand Down Expand Up @@ -212,15 +223,6 @@ impl MetaSrvBuilder {
}
};

// TODO(weny): considers to modify the default config of procedure manager
let ddl_manager = Arc::new(DdlManager::new(
procedure_manager.clone(),
kv_store.clone(),
Arc::new(DatanodeClients::default()),
));

let _ = ddl_manager.try_start();

Ok(MetaSrv {
started,
options,
Expand Down
2 changes: 2 additions & 0 deletions src/meta-srv/src/procedure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,7 @@
// limitations under the License.

pub mod create_table;
pub mod drop_table;
pub mod region_failover;
pub(crate) mod state_store;
mod utils;
44 changes: 9 additions & 35 deletions src/meta-srv/src/procedure/create_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,14 @@ use common_meta::rpc::ddl::CreateTableTask;
use common_meta::rpc::router::TableRoute;
use common_meta::table_name::TableName;
use common_procedure::error::{FromJsonSnafu, Result as ProcedureResult, ToJsonSnafu};
use common_procedure::{
Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure, Status,
};
use common_procedure::{Context as ProcedureContext, LockKey, Procedure, Status};
use futures::future::join_all;
use serde::{Deserialize, Serialize};
use snafu::{ensure, ResultExt};
use table::engine::TableReference;
use table::metadata::TableId;

use super::utils::{handle_request_datanode_error, handle_retry_error};
use crate::ddl::DdlContext;
use crate::error::{self, Result};
use crate::service::router::create_table_global_value;
Expand Down Expand Up @@ -212,14 +211,9 @@ impl CreateTableProcedure {
create_expr_for_region.region_numbers = regions;

joins.push(common_runtime::spawn_bg(async move {
if let Err(err) = client
.create(create_expr_for_region)
.await
.context(error::RequestDatanodeSnafu { peer: datanode })
{
// TODO(weny): add tests for `TableAlreadyExists`
if let Err(err) = client.create(create_expr_for_region).await {
if err.status_code() != StatusCode::TableAlreadyExists {
return Err(err);
return Err(handle_request_datanode_error(datanode)(err));
}
}
Ok(())
Expand All @@ -229,17 +223,7 @@ impl CreateTableProcedure {
let _ = join_all(joins)
.await
.into_iter()
.map(|result| {
result.map_err(|err| {
error::RetryLaterSnafu {
reason: format!(
"Failed to execute create table on datanode, source: {}",
err
),
}
.build()
})
})
.map(|e| e.context(error::JoinSnafu))
.collect::<Result<Vec<_>>>()?;

self.creator.data.state = CreateTableState::CreateMetadata;
Expand All @@ -255,22 +239,12 @@ impl Procedure for CreateTableProcedure {
}

async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
let error_handler = |e| {
if matches!(e, error::Error::RetryLater { .. }) {
ProcedureError::retry_later(e)
} else {
ProcedureError::external(e)
}
};
match self.creator.data.state {
CreateTableState::Prepare => self.on_prepare().await.map_err(error_handler),
CreateTableState::DatanodeCreateTable => {
self.on_datanode_create_table().await.map_err(error_handler)
}
CreateTableState::CreateMetadata => {
self.on_create_metadata().await.map_err(error_handler)
}
CreateTableState::Prepare => self.on_prepare().await,
CreateTableState::DatanodeCreateTable => self.on_datanode_create_table().await,
CreateTableState::CreateMetadata => self.on_create_metadata().await,
}
.map_err(handle_retry_error)
}

fn dump(&self) -> ProcedureResult<String> {
Expand Down
Loading

0 comments on commit 64acfd3

Please sign in to comment.