Skip to content

Commit

Permalink
feat: support renaming table in the catalog manger
Browse files Browse the repository at this point in the history
  • Loading branch information
e1ijah1 committed Jan 4, 2023
1 parent f1b95e2 commit 9fa1419
Show file tree
Hide file tree
Showing 11 changed files with 102 additions and 20 deletions.
27 changes: 27 additions & 0 deletions src/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,10 @@ pub trait CatalogManager: CatalogList {
/// schema registered.
async fn register_schema(&self, request: RegisterSchemaRequest) -> Result<bool>;

/// Rename a table within given catalog/schema/table_name to catalog manager,
/// returns whether the table renamed.
async fn rename_table(&self, request: RenameTableRequest) -> Result<bool>;

/// Register a system table, should be called before starting the manager.
async fn register_system_table(&self, request: RegisterSystemTableRequest)
-> error::Result<()>;
Expand Down Expand Up @@ -142,6 +146,29 @@ impl Debug for RegisterTableRequest {
}
}

#[derive(Clone)]
pub struct RenameTableRequest {
pub catalog: String,
pub schema: String,
pub table_name: String,
pub new_table_name: String,
pub table_id: TableId,
pub table: TableRef,
}

impl Debug for RenameTableRequest {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RenameTableRequest")
.field("catalog", &self.catalog)
.field("schema", &self.schema)
.field("table_name", &self.table_name)
.field("new_table_name", &self.new_table_name)
.field("table_id", &self.table_id)
.field("table", &self.table.table_info())
.finish()
}
}

#[derive(Clone)]
pub struct DeregisterTableRequest {
pub catalog: String,
Expand Down
8 changes: 7 additions & 1 deletion src/catalog/src/local/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ use crate::tables::SystemCatalog;
use crate::{
format_full_table_name, handle_system_table_request, CatalogList, CatalogManager,
CatalogProvider, CatalogProviderRef, DeregisterTableRequest, RegisterSchemaRequest,
RegisterSystemTableRequest, RegisterTableRequest, SchemaProvider, SchemaProviderRef,
RegisterSystemTableRequest, RegisterTableRequest, RenameTableRequest, SchemaProvider,
SchemaProviderRef,
};

/// A `CatalogManager` consists of a system catalog and a bunch of user catalogs.
Expand Down Expand Up @@ -377,6 +378,11 @@ impl CatalogManager for LocalCatalogManager {
}
}

async fn rename_table(&self, _request: RenameTableRequest) -> Result<bool> {
// todo impl rename_table for catalog manager
todo!()
}

async fn deregister_table(&self, _request: DeregisterTableRequest) -> Result<bool> {
UnimplementedSnafu {
operation: "deregister table",
Expand Down
8 changes: 7 additions & 1 deletion src/catalog/src/local/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ use crate::error::{CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExist
use crate::schema::SchemaProvider;
use crate::{
CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, DeregisterTableRequest,
RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, SchemaProviderRef,
RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, RenameTableRequest,
SchemaProviderRef,
};

/// Simple in-memory list of catalogs
Expand Down Expand Up @@ -88,6 +89,11 @@ impl CatalogManager for MemoryCatalogManager {
.map(|v| v.is_none())
}

async fn rename_table(&self, _request: RenameTableRequest) -> Result<bool> {
// todo impl rename_table for catalog manager
todo!()
}

async fn deregister_table(&self, request: DeregisterTableRequest) -> Result<bool> {
let catalogs = self.catalogs.write().unwrap();
let catalog = catalogs
Expand Down
7 changes: 6 additions & 1 deletion src/catalog/src/remote/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use crate::remote::{Kv, KvBackendRef};
use crate::{
handle_system_table_request, CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef,
DeregisterTableRequest, RegisterSchemaRequest, RegisterSystemTableRequest,
RegisterTableRequest, SchemaProvider, SchemaProviderRef,
RegisterTableRequest, RenameTableRequest, SchemaProvider, SchemaProviderRef,
};

/// Catalog manager based on metasrv.
Expand Down Expand Up @@ -448,6 +448,11 @@ impl CatalogManager for RemoteCatalogManager {
Ok(true)
}

async fn rename_table(&self, _request: RenameTableRequest) -> Result<bool> {
// todo impl rename_table for catalog manager
todo!()
}

async fn register_system_table(&self, request: RegisterSystemTableRequest) -> Result<()> {
let mut requests = self.system_table_requests.lock().await;
requests.push(request);
Expand Down
7 changes: 7 additions & 0 deletions src/datanode/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,12 @@ pub enum Error {
source: catalog::error::Error,
},

#[snafu(display("Failed to rename table, source: {}", source))]
RenameTable {
#[snafu(backtrace)]
source: catalog::error::Error,
},

#[snafu(display("Failed to register a new schema, source: {}", source))]
RegisterSchema {
#[snafu(backtrace)]
Expand Down Expand Up @@ -365,6 +371,7 @@ impl ErrorExt for Error {
| Error::StartGrpc { .. }
| Error::CreateDir { .. }
| Error::InsertSystemCatalog { .. }
| Error::RenameTable { .. }
| Error::RegisterSchema { .. }
| Error::Catalog { .. }
| Error::MissingRequiredField { .. }
Expand Down
34 changes: 26 additions & 8 deletions src/datanode/src/sql/alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use catalog::RenameTableRequest;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_query::Output;
use snafu::prelude::*;
Expand All @@ -28,11 +29,11 @@ impl SqlHandler {
let ctx = EngineContext {};
let catalog_name = req.catalog_name.as_deref().unwrap_or(DEFAULT_CATALOG_NAME);
let schema_name = req.schema_name.as_deref().unwrap_or(DEFAULT_SCHEMA_NAME);
let table_name = &req.table_name.to_string();
let table_name = req.table_name.clone();
let table_ref = TableReference {
catalog: catalog_name,
schema: schema_name,
table: table_name,
table: &table_name,
};

let full_table_name = table_ref.to_string();
Expand All @@ -43,12 +44,29 @@ impl SqlHandler {
table_name: &full_table_name,
}
);
self.table_engine
.alter_table(&ctx, req)
.await
.context(error::AlterTableSnafu {
table_name: full_table_name,
})?;
let kind = req.alter_kind.clone();
let table =
self.table_engine
.alter_table(&ctx, req)
.await
.context(error::AlterTableSnafu {
table_name: full_table_name,
})?;
let table_info = &table.table_info();
if let AlterKind::RenameTable { .. } = kind {
let rename_table_req = RenameTableRequest {
catalog: table_info.catalog_name.clone(),
schema: table_info.schema_name.clone(),
table_name,
new_table_name: table_info.name.clone(),
table_id: table_info.ident.table_id,
table,
};
self.catalog_manager
.rename_table(rename_table_req)
.await
.context(error::RenameTableSnafu)?;
}
// Tried in MySQL, it really prints "Affected Rows: 0".
Ok(Output::AffectedRows(0))
}
Expand Down
8 changes: 6 additions & 2 deletions src/frontend/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use catalog::helper::{
use catalog::remote::{Kv, KvBackendRef};
use catalog::{
CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, DeregisterTableRequest,
RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, SchemaProvider,
SchemaProviderRef,
RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, RenameTableRequest,
SchemaProvider, SchemaProviderRef,
};
use futures::StreamExt;
use meta_client::rpc::TableName;
Expand Down Expand Up @@ -97,6 +97,10 @@ impl CatalogManager for FrontendCatalogManager {
unimplemented!()
}

async fn rename_table(&self, _request: RenameTableRequest) -> catalog_err::Result<bool> {
unimplemented!()
}

async fn register_system_table(
&self,
_request: RegisterSystemTableRequest,
Expand Down
15 changes: 12 additions & 3 deletions src/mito/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ use store_api::storage::{
};
use table::engine::{EngineContext, TableEngine, TableReference};
use table::metadata::{TableId, TableInfoBuilder, TableMetaBuilder, TableType, TableVersion};
use table::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest};
use table::requests::{
AlterKind, AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest,
};
use table::table::TableRef;
use table::{error as table_error, Result as TableResult, Table};
use tokio::sync::Mutex;
Expand Down Expand Up @@ -491,7 +493,7 @@ impl<S: StorageEngine> MitoEngineInner<S> {
let schema_name = req.schema_name.as_deref().unwrap_or(DEFAULT_SCHEMA_NAME);
let table_name = &req.table_name.clone();

let table_ref = TableReference {
let mut table_ref = TableReference {
catalog: catalog_name,
schema: schema_name,
table: table_name,
Expand All @@ -502,9 +504,16 @@ impl<S: StorageEngine> MitoEngineInner<S> {

logging::info!("start altering table {} with request {:?}", table_name, req);
table
.alter(req)
.alter(&req)
.await
.context(error::AlterTableSnafu { table_name })?;

if let AlterKind::RenameTable { new_table_name } = &req.alter_kind {
table_ref.table = new_table_name.as_str();
let mut tables = self.tables.write().unwrap();
tables.remove(&table_ref.to_string());
tables.insert(table_ref.to_string(), table.clone());
}
Ok(table)
}

Expand Down
2 changes: 1 addition & 1 deletion src/mito/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ impl<R: Region> Table for MitoTable<R> {
}

/// Alter table changes the schemas of the table.
async fn alter(&self, req: AlterTableRequest) -> TableResult<()> {
async fn alter(&self, req: &AlterTableRequest) -> TableResult<()> {
let _lock = self.alter_lock.lock().await;

let table_info = self.table_info();
Expand Down
4 changes: 2 additions & 2 deletions src/table/src/requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,13 @@ pub struct AlterTableRequest {
}

/// Add column request
#[derive(Debug)]
#[derive(Debug, Clone)]
pub struct AddColumnRequest {
pub column_schema: ColumnSchema,
pub is_key: bool,
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum AlterKind {
AddColumns { columns: Vec<AddColumnRequest> },
DropColumns { names: Vec<String> },
Expand Down
2 changes: 1 addition & 1 deletion src/table/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ pub trait Table: Send + Sync {
Ok(FilterPushDownType::Unsupported)
}

async fn alter(&self, request: AlterTableRequest) -> Result<()> {
async fn alter(&self, request: &AlterTableRequest) -> Result<()> {
let _ = request;
unimplemented!()
}
Expand Down

0 comments on commit 9fa1419

Please sign in to comment.