From 6775c5be8791afc9a16f1cd9d5f49c0525beaed1 Mon Sep 17 00:00:00 2001 From: elijah <30852919+e1ijah1@users.noreply.github.com> Date: Thu, 12 Jan 2023 19:48:18 +0800 Subject: [PATCH] feat: support renaming table in the catalog manger (#824) * feat: support renaming table in the catalog manger * feat: implement rename table for local catalog manager * chore: fmt code * fix: update system catalog when renaming table in local catalog manager * chore: add instance test for rename table * chore: fix frontend test * chore: fix comment * chore: fix rename table test * fix: renaming a table with an existing name * fix: improve the system catalog's renaming process * chore: improve the code * chore: improve the comment Co-authored-by: Yingwen * chore: improve the code * chore: fix tests * chore: fix instance_test * chore: improve the code Co-authored-by: Yingwen --- src/catalog/src/lib.rs | 12 ++ src/catalog/src/local/manager.rs | 42 ++++++- src/catalog/src/local/memory.rs | 119 +++++++++++++++++++- src/catalog/src/remote/manager.rs | 16 ++- src/catalog/src/schema.rs | 4 + src/catalog/src/system.rs | 35 ++++-- src/catalog/src/tables.rs | 11 +- src/catalog/tests/local_catalog_tests.rs | 40 ++++++- src/datanode/src/error.rs | 7 ++ src/datanode/src/mock.rs | 38 +++++-- src/datanode/src/sql.rs | 6 + src/datanode/src/sql/alter.rs | 33 ++++-- src/datanode/src/tests/instance_test.rs | 52 +++++++++ src/frontend/src/catalog.rs | 12 +- src/frontend/src/instance/distributed.rs | 2 +- src/frontend/src/table.rs | 4 +- src/frontend/src/tests.rs | 1 + src/mito/src/engine.rs | 70 +++++++++++- src/mito/src/table.rs | 2 +- src/query/src/datafusion/catalog_adapter.rs | 4 + src/table/src/requests.rs | 10 +- src/table/src/table.rs | 2 +- 22 files changed, 467 insertions(+), 55 deletions(-) diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index 5f1d2d89fb38..a387356db2c2 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -97,6 +97,9 @@ pub trait CatalogManager: CatalogList { /// schema registered. async fn register_schema(&self, request: RegisterSchemaRequest) -> Result; + /// Rename a table to [RenameTableRequest::new_table_name], returns whether the table is renamed. + async fn rename_table(&self, request: RenameTableRequest) -> Result; + /// Register a system table, should be called before starting the manager. async fn register_system_table(&self, request: RegisterSystemTableRequest) -> error::Result<()>; @@ -142,6 +145,15 @@ impl Debug for RegisterTableRequest { } } +#[derive(Debug, Clone)] +pub struct RenameTableRequest { + pub catalog: String, + pub schema: String, + pub table_name: String, + pub new_table_name: String, + pub table_id: TableId, +} + #[derive(Clone)] pub struct DeregisterTableRequest { pub catalog: String, diff --git a/src/catalog/src/local/manager.rs b/src/catalog/src/local/manager.rs index 375967dab870..27178941ee0f 100644 --- a/src/catalog/src/local/manager.rs +++ b/src/catalog/src/local/manager.rs @@ -47,7 +47,8 @@ use crate::tables::SystemCatalog; use crate::{ format_full_table_name, handle_system_table_request, CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, DeregisterTableRequest, RegisterSchemaRequest, - RegisterSystemTableRequest, RegisterTableRequest, SchemaProvider, SchemaProviderRef, + RegisterSystemTableRequest, RegisterTableRequest, RenameTableRequest, SchemaProvider, + SchemaProviderRef, }; /// A `CatalogManager` consists of a system catalog and a bunch of user catalogs. @@ -379,6 +380,45 @@ impl CatalogManager for LocalCatalogManager { } } + async fn rename_table(&self, request: RenameTableRequest) -> Result { + let started = self.init_lock.lock().await; + + ensure!( + *started, + IllegalManagerStateSnafu { + msg: "Catalog manager not started", + } + ); + + let catalog_name = &request.catalog; + let schema_name = &request.schema; + + let catalog = self + .catalogs + .catalog(catalog_name)? + .context(CatalogNotFoundSnafu { catalog_name })?; + + let schema = catalog + .schema(schema_name)? + .with_context(|| SchemaNotFoundSnafu { + catalog: catalog_name, + schema: schema_name, + })?; + + // rename table in system catalog + self.system + .register_table( + catalog_name.clone(), + schema_name.clone(), + request.new_table_name.clone(), + request.table_id, + ) + .await?; + Ok(schema + .rename_table(&request.table_name, request.new_table_name) + .is_ok()) + } + async fn deregister_table(&self, _request: DeregisterTableRequest) -> Result { UnimplementedSnafu { operation: "deregister table", diff --git a/src/catalog/src/local/memory.rs b/src/catalog/src/local/memory.rs index e4e1dc0da405..285809321951 100644 --- a/src/catalog/src/local/memory.rs +++ b/src/catalog/src/local/memory.rs @@ -25,11 +25,14 @@ use table::metadata::TableId; use table::table::TableIdProvider; use table::TableRef; -use crate::error::{CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu}; +use crate::error::{ + CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu, TableNotFoundSnafu, +}; use crate::schema::SchemaProvider; use crate::{ CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, DeregisterTableRequest, - RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, SchemaProviderRef, + RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, RenameTableRequest, + SchemaProviderRef, }; /// Simple in-memory list of catalogs @@ -89,6 +92,25 @@ impl CatalogManager for MemoryCatalogManager { .map(|v| v.is_none()) } + async fn rename_table(&self, request: RenameTableRequest) -> Result { + let catalogs = self.catalogs.write().unwrap(); + let catalog = catalogs + .get(&request.catalog) + .context(CatalogNotFoundSnafu { + catalog_name: &request.catalog, + })? + .clone(); + let schema = catalog + .schema(&request.schema)? + .with_context(|| SchemaNotFoundSnafu { + catalog: &request.catalog, + schema: &request.schema, + })?; + Ok(schema + .rename_table(&request.table_name, request.new_table_name) + .is_ok()) + } + async fn deregister_table(&self, request: DeregisterTableRequest) -> Result { let catalogs = self.catalogs.write().unwrap(); let catalog = catalogs @@ -290,6 +312,20 @@ impl SchemaProvider for MemorySchemaProvider { } } + fn rename_table(&self, name: &str, new_name: String) -> Result { + let mut tables = self.tables.write().unwrap(); + if tables.get(name).is_some() { + let table = tables.remove(name).unwrap(); + tables.insert(new_name, table.clone()); + Ok(table) + } else { + TableNotFoundSnafu { + table_info: name.to_string(), + } + .fail()? + } + } + fn deregister_table(&self, name: &str) -> Result> { let mut tables = self.tables.write().unwrap(); Ok(tables.remove(name)) @@ -354,6 +390,85 @@ mod tests { assert_eq!(StatusCode::TableAlreadyExists, err.status_code()); } + #[tokio::test] + async fn test_mem_provider_rename_table() { + let provider = MemorySchemaProvider::new(); + let table_name = "num"; + assert!(!provider.table_exist(table_name).unwrap()); + let test_table: TableRef = Arc::new(NumbersTable::default()); + // register test table + assert!(provider + .register_table(table_name.to_string(), test_table.clone()) + .unwrap() + .is_none()); + assert!(provider.table_exist(table_name).unwrap()); + + // rename test table + let new_table_name = "numbers"; + provider + .rename_table(table_name, new_table_name.to_string()) + .unwrap(); + + // test old table name not exist + assert!(!provider.table_exist(table_name).unwrap()); + assert!(provider.deregister_table(table_name).unwrap().is_none()); + + // test new table name exists + assert!(provider.table_exist(new_table_name).unwrap()); + let registered_table = provider.table(new_table_name).unwrap().unwrap(); + assert_eq!( + registered_table.table_info().ident.table_id, + test_table.table_info().ident.table_id + ); + + let other_table = Arc::new(NumbersTable::new(2)); + let result = provider.register_table(new_table_name.to_string(), other_table); + let err = result.err().unwrap(); + assert_eq!(StatusCode::TableAlreadyExists, err.status_code()); + } + + #[tokio::test] + async fn test_catalog_rename_table() { + let catalog = MemoryCatalogManager::default(); + let schema = catalog + .schema(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME) + .unwrap() + .unwrap(); + + // register table + let table_name = "num"; + let table_id = 2333; + let table: TableRef = Arc::new(NumbersTable::new(table_id)); + let register_table_req = RegisterTableRequest { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + table_name: table_name.to_string(), + table_id, + table, + }; + assert!(catalog.register_table(register_table_req).await.unwrap()); + assert!(schema.table_exist(table_name).unwrap()); + + // rename table + let new_table_name = "numbers"; + let rename_table_req = RenameTableRequest { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + table_name: table_name.to_string(), + new_table_name: new_table_name.to_string(), + table_id, + }; + assert!(catalog.rename_table(rename_table_req).await.unwrap()); + assert!(!schema.table_exist(table_name).unwrap()); + assert!(schema.table_exist(new_table_name).unwrap()); + + let registered_table = catalog + .table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, new_table_name) + .unwrap() + .unwrap(); + assert_eq!(registered_table.table_info().ident.table_id, table_id); + } + #[test] pub fn test_register_if_absent() { let list = MemoryCatalogManager::default(); diff --git a/src/catalog/src/remote/manager.rs b/src/catalog/src/remote/manager.rs index 36659c5c04ab..a930c29d320c 100644 --- a/src/catalog/src/remote/manager.rs +++ b/src/catalog/src/remote/manager.rs @@ -43,7 +43,7 @@ use crate::remote::{Kv, KvBackendRef}; use crate::{ handle_system_table_request, CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, DeregisterTableRequest, RegisterSchemaRequest, RegisterSystemTableRequest, - RegisterTableRequest, SchemaProvider, SchemaProviderRef, + RegisterTableRequest, RenameTableRequest, SchemaProvider, SchemaProviderRef, }; /// Catalog manager based on metasrv. @@ -449,6 +449,13 @@ impl CatalogManager for RemoteCatalogManager { Ok(true) } + async fn rename_table(&self, _request: RenameTableRequest) -> Result { + UnimplementedSnafu { + operation: "rename table", + } + .fail() + } + async fn register_system_table(&self, request: RegisterSystemTableRequest) -> Result<()> { let mut requests = self.system_table_requests.lock().await; requests.push(request); @@ -739,6 +746,13 @@ impl SchemaProvider for RemoteSchemaProvider { prev } + fn rename_table(&self, _name: &str, _new_name: String) -> Result { + UnimplementedSnafu { + operation: "rename table", + } + .fail() + } + fn deregister_table(&self, name: &str) -> Result> { let table_name = name.to_string(); let table_key = self.build_regional_table_key(&table_name).to_string(); diff --git a/src/catalog/src/schema.rs b/src/catalog/src/schema.rs index 3f91f8d81b12..0dc9ddab88fe 100644 --- a/src/catalog/src/schema.rs +++ b/src/catalog/src/schema.rs @@ -35,6 +35,10 @@ pub trait SchemaProvider: Sync + Send { /// If a table of the same name existed before, it returns "Table already exists" error. fn register_table(&self, name: String, table: TableRef) -> Result>; + /// If supported by the implementation, renames an existing table from this schema and returns it. + /// If no table of that name exists, returns "Table not found" error. + fn rename_table(&self, name: &str, new_name: String) -> Result; + /// If supported by the implementation, removes an existing table from this schema and returns it. /// If no table of that name exists, returns Ok(None). fn deregister_table(&self, name: &str) -> Result>; diff --git a/src/catalog/src/system.rs b/src/catalog/src/system.rs index d135365c4ce4..c163979cb3bc 100644 --- a/src/catalog/src/system.rs +++ b/src/catalog/src/system.rs @@ -186,11 +186,23 @@ fn build_system_catalog_schema() -> Schema { SchemaBuilder::try_from(cols).unwrap().build().unwrap() } -pub fn build_table_insert_request(full_table_name: String, table_id: TableId) -> InsertRequest { +/// Formats key string for table entry in system catalog +#[inline] +pub fn format_table_entry_key(catalog: &str, schema: &str, table_id: TableId) -> String { + format!("{catalog}.{schema}.{table_id}") +} + +pub fn build_table_insert_request( + catalog: String, + schema: String, + table_name: String, + table_id: TableId, +) -> InsertRequest { + let entry_key = format_table_entry_key(&catalog, &schema, table_id); build_insert_request( EntryType::Table, - full_table_name.as_bytes(), - serde_json::to_string(&TableEntryValue { table_id }) + entry_key.as_bytes(), + serde_json::to_string(&TableEntryValue { table_name }) .unwrap() .as_bytes(), ) @@ -285,8 +297,8 @@ pub fn decode_system_catalog( } EntryType::Table => { - // As for table entry, the key is a string with format: `..` - // and the value is a JSON string with format: `{"table_id": }` + // As for table entry, the key is a string with format: `..` + // and the value is a JSON string with format: `{"table_name": }` let table_parts = key.split('.').collect::>(); ensure!( table_parts.len() >= 3, @@ -298,11 +310,12 @@ pub fn decode_system_catalog( debug!("Table meta value: {}", String::from_utf8_lossy(value)); let table_meta: TableEntryValue = serde_json::from_slice(value).context(ValueDeserializeSnafu)?; + let table_id = table_parts[2].parse::().unwrap(); Ok(Entry::Table(TableEntry { catalog_name: table_parts[0].to_string(), schema_name: table_parts[1].to_string(), - table_name: table_parts[2].to_string(), - table_id: table_meta.table_id, + table_name: table_meta.table_name, + table_id, })) } } @@ -362,7 +375,7 @@ pub struct TableEntry { #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] pub struct TableEntryValue { - pub table_id: TableId, + pub table_name: String, } #[cfg(test)] @@ -415,8 +428,8 @@ mod tests { pub fn test_decode_table() { let entry = decode_system_catalog( Some(EntryType::Table as u8), - Some("some_catalog.some_schema.some_table".as_bytes()), - Some("{\"table_id\":42}".as_bytes()), + Some("some_catalog.some_schema.42".as_bytes()), + Some("{\"table_name\":\"some_table\"}".as_bytes()), ) .unwrap(); @@ -435,7 +448,7 @@ mod tests { pub fn test_decode_mismatch() { decode_system_catalog( Some(EntryType::Table as u8), - Some("some_catalog.some_schema.some_table".as_bytes()), + Some("some_catalog.some_schema.42".as_bytes()), None, ) .unwrap(); diff --git a/src/catalog/src/tables.rs b/src/catalog/src/tables.rs index 1485c51e1d7b..971b7524900c 100644 --- a/src/catalog/src/tables.rs +++ b/src/catalog/src/tables.rs @@ -40,9 +40,7 @@ use table::{Table, TableRef}; use crate::error::{Error, InsertCatalogRecordSnafu}; use crate::system::{build_schema_insert_request, build_table_insert_request, SystemCatalogTable}; -use crate::{ - format_full_table_name, CatalogListRef, CatalogProvider, SchemaProvider, SchemaProviderRef, -}; +use crate::{CatalogListRef, CatalogProvider, SchemaProvider, SchemaProviderRef}; /// Tables holds all tables created by user. pub struct Tables { @@ -233,6 +231,10 @@ impl SchemaProvider for InformationSchema { panic!("System catalog & schema does not support register table") } + fn rename_table(&self, _name: &str, _new_name: String) -> crate::error::Result { + unimplemented!("System catalog & schema does not support rename table") + } + fn deregister_table(&self, _name: &str) -> crate::error::Result> { panic!("System catalog & schema does not support deregister table") } @@ -269,8 +271,7 @@ impl SystemCatalog { table_name: String, table_id: TableId, ) -> crate::error::Result { - let full_table_name = format_full_table_name(&catalog, &schema, &table_name); - let request = build_table_insert_request(full_table_name, table_id); + let request = build_table_insert_request(catalog, schema, table_name, table_id); self.information_schema .system .insert(request) diff --git a/src/catalog/tests/local_catalog_tests.rs b/src/catalog/tests/local_catalog_tests.rs index f40639ed4bc6..1a68ef96ab07 100644 --- a/src/catalog/tests/local_catalog_tests.rs +++ b/src/catalog/tests/local_catalog_tests.rs @@ -17,7 +17,7 @@ mod tests { use std::sync::Arc; use catalog::local::LocalCatalogManager; - use catalog::{CatalogManager, RegisterTableRequest}; + use catalog::{CatalogManager, RegisterTableRequest, RenameTableRequest}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_telemetry::{error, info}; use mito::config::EngineConfig; @@ -38,6 +38,44 @@ mod tests { Ok(catalog_manager) } + #[tokio::test] + async fn test_rename_table() { + common_telemetry::init_default_ut_logging(); + let catalog_manager = create_local_catalog_manager().await.unwrap(); + // register table + let table_name = "test_table"; + let table_id = 42; + let table = Arc::new(NumbersTable::new(table_id)); + let request = RegisterTableRequest { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + table_name: table_name.to_string(), + table_id, + table: table.clone(), + }; + assert!(catalog_manager.register_table(request).await.unwrap()); + + // rename table + let new_table_name = "table_t"; + let rename_table_req = RenameTableRequest { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + table_name: table_name.to_string(), + new_table_name: new_table_name.to_string(), + table_id, + }; + assert!(catalog_manager + .rename_table(rename_table_req) + .await + .unwrap()); + + let registered_table = catalog_manager + .table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, new_table_name) + .unwrap() + .unwrap(); + assert_eq!(registered_table.table_info().ident.table_id, table_id); + } + #[tokio::test] async fn test_duplicate_register() { let catalog_manager = create_local_catalog_manager().await.unwrap(); diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index aacb255d8c70..d9cea801ddce 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -193,6 +193,12 @@ pub enum Error { source: catalog::error::Error, }, + #[snafu(display("Failed to rename table, source: {}", source))] + RenameTable { + #[snafu(backtrace)] + source: catalog::error::Error, + }, + #[snafu(display("Failed to register a new schema, source: {}", source))] RegisterSchema { #[snafu(backtrace)] @@ -341,6 +347,7 @@ impl ErrorExt for Error { | Error::StartGrpc { .. } | Error::CreateDir { .. } | Error::InsertSystemCatalog { .. } + | Error::RenameTable { .. } | Error::RegisterSchema { .. } | Error::Catalog { .. } | Error::MissingRequiredField { .. } diff --git a/src/datanode/src/mock.rs b/src/datanode/src/mock.rs index 9fa2b9bfe345..ec4304e53afd 100644 --- a/src/datanode/src/mock.rs +++ b/src/datanode/src/mock.rs @@ -16,18 +16,21 @@ use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; use catalog::remote::MetaKvBackend; +use catalog::CatalogManagerRef; use common_catalog::consts::MIN_USER_TABLE_ID; use meta_client::client::{MetaClient, MetaClientBuilder}; use meta_srv::mocks::MockInfo; use mito::config::EngineConfig as TableEngineConfig; use query::QueryEngineFactory; +use servers::Mode; +use snafu::ResultExt; use storage::config::EngineConfig as StorageEngineConfig; use storage::EngineImpl; use table::metadata::TableId; use table::table::TableIdProvider; use crate::datanode::DatanodeOptions; -use crate::error::Result; +use crate::error::{CatalogSnafu, Result}; use crate::heartbeat::HeartbeatTask; use crate::instance::{create_log_store, new_object_store, DefaultEngine, Instance}; use crate::script::ScriptExecutor; @@ -53,16 +56,29 @@ impl Instance { object_store, )); - // create remote catalog manager - let catalog_manager = Arc::new(catalog::remote::RemoteCatalogManager::new( - table_engine.clone(), - opts.node_id.unwrap_or(42), - Arc::new(MetaKvBackend { - client: meta_client.clone(), - }), - )); - - let factory = QueryEngineFactory::new(catalog_manager.clone()); + // By default, catalog manager and factory are created in standalone mode + let (catalog_manager, factory) = match opts.mode { + Mode::Standalone => { + let catalog = Arc::new( + catalog::local::LocalCatalogManager::try_new(table_engine.clone()) + .await + .context(CatalogSnafu)?, + ); + let factory = QueryEngineFactory::new(catalog.clone()); + (catalog as CatalogManagerRef, factory) + } + Mode::Distributed => { + let catalog = Arc::new(catalog::remote::RemoteCatalogManager::new( + table_engine.clone(), + opts.node_id.unwrap_or(42), + Arc::new(MetaKvBackend { + client: meta_client.clone(), + }), + )); + let factory = QueryEngineFactory::new(catalog.clone()); + (catalog as CatalogManagerRef, factory) + } + }; let query_engine = factory.query_engine(); let script_executor = ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?; diff --git a/src/datanode/src/sql.rs b/src/datanode/src/sql.rs index 8f618a23b1ce..e885bcdab08d 100644 --- a/src/datanode/src/sql.rs +++ b/src/datanode/src/sql.rs @@ -208,9 +208,15 @@ mod tests { ) -> catalog::error::Result> { unimplemented!(); } + + fn rename_table(&self, _name: &str, _new_name: String) -> catalog::error::Result { + unimplemented!() + } + fn deregister_table(&self, _name: &str) -> catalog::error::Result> { unimplemented!(); } + fn table_exist(&self, name: &str) -> catalog::error::Result { Ok(name == "demo") } diff --git a/src/datanode/src/sql/alter.rs b/src/datanode/src/sql/alter.rs index b9635ae2972f..a68b0ccf3d3a 100644 --- a/src/datanode/src/sql/alter.rs +++ b/src/datanode/src/sql/alter.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use catalog::RenameTableRequest; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_query::Output; use snafu::prelude::*; @@ -28,11 +29,11 @@ impl SqlHandler { let ctx = EngineContext {}; let catalog_name = req.catalog_name.as_deref().unwrap_or(DEFAULT_CATALOG_NAME); let schema_name = req.schema_name.as_deref().unwrap_or(DEFAULT_SCHEMA_NAME); - let table_name = &req.table_name.to_string(); + let table_name = req.table_name.clone(); let table_ref = TableReference { catalog: catalog_name, schema: schema_name, - table: table_name, + table: &table_name, }; let full_table_name = table_ref.to_string(); @@ -43,12 +44,28 @@ impl SqlHandler { table_name: &full_table_name, } ); - self.table_engine - .alter_table(&ctx, req) - .await - .context(error::AlterTableSnafu { - table_name: full_table_name, - })?; + let is_rename = req.is_rename_table(); + let table = + self.table_engine + .alter_table(&ctx, req) + .await + .context(error::AlterTableSnafu { + table_name: full_table_name, + })?; + if is_rename { + let table_info = &table.table_info(); + let rename_table_req = RenameTableRequest { + catalog: table_info.catalog_name.clone(), + schema: table_info.schema_name.clone(), + table_name, + new_table_name: table_info.name.clone(), + table_id: table_info.ident.table_id, + }; + self.catalog_manager + .rename_table(rename_table_req) + .await + .context(error::RenameTableSnafu)?; + } // Tried in MySQL, it really prints "Affected Rows: 0". Ok(Output::AffectedRows(0)) } diff --git a/src/datanode/src/tests/instance_test.rs b/src/datanode/src/tests/instance_test.rs index 215cf75a6d87..89e7f3c4067d 100644 --- a/src/datanode/src/tests/instance_test.rs +++ b/src/datanode/src/tests/instance_test.rs @@ -342,6 +342,58 @@ pub async fn test_execute_create() { assert!(matches!(output, Output::AffectedRows(0))); } +#[tokio::test] +async fn test_rename_table() { + let instance = MockInstance::new("test_rename_table_local").await; + + let output = execute_sql(&instance, "create database db").await; + assert!(matches!(output, Output::AffectedRows(1))); + + let output = execute_sql_in_db( + &instance, + "create table demo(host string, cpu double, memory double, ts timestamp, time index(ts))", + "db", + ) + .await; + assert!(matches!(output, Output::AffectedRows(0))); + + // make sure table insertion is ok before altering table name + let output = execute_sql_in_db( + &instance, + "insert into demo(host, cpu, memory, ts) values ('host1', 1.1, 100, 1000), ('host2', 2.2, 200, 2000)", + "db", + ) + .await; + assert!(matches!(output, Output::AffectedRows(2))); + + // rename table + let output = execute_sql_in_db(&instance, "alter table demo rename test_table", "db").await; + assert!(matches!(output, Output::AffectedRows(0))); + + let output = execute_sql_in_db(&instance, "show tables", "db").await; + let expect = "\ ++------------+ +| Tables | ++------------+ +| test_table | ++------------+\ +" + .to_string(); + check_output_stream(output, expect).await; + + let output = execute_sql_in_db(&instance, "select * from test_table order by ts", "db").await; + let expected = "\ ++-------+-----+--------+---------------------+ +| host | cpu | memory | ts | ++-------+-----+--------+---------------------+ +| host1 | 1.1 | 100 | 1970-01-01T00:00:01 | +| host2 | 2.2 | 200 | 1970-01-01T00:00:02 | ++-------+-----+--------+---------------------+\ +" + .to_string(); + check_output_stream(output, expected).await; +} + #[tokio::test(flavor = "multi_thread")] async fn test_alter_table() { let instance = setup_test_instance("test_alter_table").await; diff --git a/src/frontend/src/catalog.rs b/src/frontend/src/catalog.rs index e4cf05d249bd..b610fa24f9b0 100644 --- a/src/frontend/src/catalog.rs +++ b/src/frontend/src/catalog.rs @@ -24,8 +24,8 @@ use catalog::helper::{ use catalog::remote::{Kv, KvBackendRef}; use catalog::{ CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, DeregisterTableRequest, - RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, SchemaProvider, - SchemaProviderRef, + RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, RenameTableRequest, + SchemaProvider, SchemaProviderRef, }; use futures::StreamExt; use meta_client::rpc::TableName; @@ -97,6 +97,10 @@ impl CatalogManager for FrontendCatalogManager { unimplemented!() } + async fn rename_table(&self, _request: RenameTableRequest) -> catalog_err::Result { + unimplemented!() + } + async fn register_system_table( &self, _request: RegisterSystemTableRequest, @@ -322,6 +326,10 @@ impl SchemaProvider for FrontendSchemaProvider { unimplemented!("Frontend schema provider does not support register table") } + fn rename_table(&self, _name: &str, _new_name: String) -> catalog_err::Result { + unimplemented!("Frontend schema provider does not support rename table") + } + fn deregister_table(&self, _name: &str) -> catalog::error::Result> { unimplemented!("Frontend schema provider does not support deregister table") } diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index ecd6849692c1..1e8bde0459af 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -270,7 +270,7 @@ impl DistInstance { let mut context = AlterContext::with_capacity(1); context.insert(expr); - table.alter(context, request).await.context(TableSnafu)?; + table.alter(context, &request).await.context(TableSnafu)?; Ok(Output::AffectedRows(0)) } diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index a02d02752b31..c8919bbb8ee9 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -163,7 +163,7 @@ impl Table for DistTable { Ok(FilterPushDownType::Inexact) } - async fn alter(&self, context: AlterContext, request: AlterTableRequest) -> table::Result<()> { + async fn alter(&self, context: AlterContext, request: &AlterTableRequest) -> table::Result<()> { self.handle_alter(context, request) .await .map_err(BoxedError::new) @@ -414,7 +414,7 @@ impl DistTable { .context(CatalogSnafu) } - async fn handle_alter(&self, context: AlterContext, request: AlterTableRequest) -> Result<()> { + async fn handle_alter(&self, context: AlterContext, request: &AlterTableRequest) -> Result<()> { let alter_expr = context .get::() .context(ContextValueNotFoundSnafu { key: "AlterExpr" })?; diff --git a/src/frontend/src/tests.rs b/src/frontend/src/tests.rs index 799d60aa5163..9fd55e24fb9e 100644 --- a/src/frontend/src/tests.rs +++ b/src/frontend/src/tests.rs @@ -170,6 +170,7 @@ async fn create_distributed_datanode( storage: ObjectStoreConfig::File { data_dir: data_tmp_dir.path().to_str().unwrap().to_string(), }, + mode: Mode::Distributed, ..Default::default() }; diff --git a/src/mito/src/engine.rs b/src/mito/src/engine.rs index 75845414d33b..b837f168808d 100644 --- a/src/mito/src/engine.rs +++ b/src/mito/src/engine.rs @@ -29,7 +29,9 @@ use store_api::storage::{ }; use table::engine::{EngineContext, TableEngine, TableReference}; use table::metadata::{TableId, TableInfoBuilder, TableMetaBuilder, TableType, TableVersion}; -use table::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest}; +use table::requests::{ + AlterKind, AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest, +}; use table::table::{AlterContext, TableRef}; use table::{error as table_error, Result as TableResult, Table}; use tokio::sync::Mutex; @@ -491,7 +493,22 @@ impl MitoEngineInner { let schema_name = req.schema_name.as_deref().unwrap_or(DEFAULT_SCHEMA_NAME); let table_name = &req.table_name.clone(); - let table_ref = TableReference { + if let AlterKind::RenameTable { new_table_name } = &req.alter_kind { + let table_ref = TableReference { + catalog: catalog_name, + schema: schema_name, + table: new_table_name, + }; + + if self.get_table(&table_ref).is_some() { + return TableExistsSnafu { + table_name: table_ref.to_string(), + } + .fail(); + } + } + + let mut table_ref = TableReference { catalog: catalog_name, schema: schema_name, table: table_name, @@ -502,9 +519,17 @@ impl MitoEngineInner { logging::info!("start altering table {} with request {:?}", table_name, req); table - .alter(AlterContext::new(), req) + .alter(AlterContext::new(), &req) .await .context(error::AlterTableSnafu { table_name })?; + + if let AlterKind::RenameTable { new_table_name } = &req.alter_kind { + table_ref.table = new_table_name.as_str(); + let full_table_name = table_ref.to_string(); + let mut tables = self.tables.write().unwrap(); + tables.remove(&full_table_name); + tables.insert(full_table_name, table.clone()); + } Ok(table) } @@ -556,7 +581,7 @@ mod tests { use super::*; use crate::table::test_util; - use crate::table::test_util::{new_insert_request, MockRegion, TABLE_NAME}; + use crate::table::test_util::{new_insert_request, schema_for_test, MockRegion, TABLE_NAME}; async fn setup_table_with_column_default_constraint() -> (TempDir, String, TableRef) { let table_name = "test_default_constraint"; @@ -1070,8 +1095,42 @@ mod tests { async fn test_alter_rename_table() { let (engine, table_engine, _table, object_store, _dir) = test_util::setup_mock_engine_and_table().await; + let ctx = EngineContext::default(); + + // register another table + let another_name = "another_table"; + let req = CreateTableRequest { + id: 1024, + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: another_name.to_string(), + desc: Some("another test table".to_string()), + schema: Arc::new(schema_for_test()), + region_numbers: vec![0], + primary_key_indices: vec![0], + create_if_not_exists: true, + table_options: HashMap::new(), + }; + table_engine + .create_table(&ctx, req) + .await + .expect("create table must succeed"); + // test renaming a table with an existing name. + let req = AlterTableRequest { + catalog_name: None, + schema_name: None, + table_name: TABLE_NAME.to_string(), + alter_kind: AlterKind::RenameTable { + new_table_name: another_name.to_string(), + }, + }; + let err = table_engine.alter_table(&ctx, req).await.err().unwrap(); + assert!( + err.to_string().contains("Table already exists"), + "Unexpected error: {err}" + ); - let new_table_name = "table_t"; + let new_table_name = "test_table"; // test rename table let req = AlterTableRequest { catalog_name: None, @@ -1081,7 +1140,6 @@ mod tests { new_table_name: new_table_name.to_string(), }, }; - let ctx = EngineContext::default(); let table = table_engine.alter_table(&ctx, req).await.unwrap(); assert_eq!(table.table_info().name, new_table_name); diff --git a/src/mito/src/table.rs b/src/mito/src/table.rs index e60523a71baf..16fd8ce63524 100644 --- a/src/mito/src/table.rs +++ b/src/mito/src/table.rs @@ -168,7 +168,7 @@ impl Table for MitoTable { } /// Alter table changes the schemas of the table. - async fn alter(&self, _context: AlterContext, req: AlterTableRequest) -> TableResult<()> { + async fn alter(&self, _context: AlterContext, req: &AlterTableRequest) -> TableResult<()> { let _lock = self.alter_lock.lock().await; let table_info = self.table_info(); diff --git a/src/query/src/datafusion/catalog_adapter.rs b/src/query/src/datafusion/catalog_adapter.rs index ce16df8c6516..d5bfd7ad62c6 100644 --- a/src/query/src/datafusion/catalog_adapter.rs +++ b/src/query/src/datafusion/catalog_adapter.rs @@ -231,6 +231,10 @@ impl SchemaProvider for SchemaProviderAdapter { .map(|_| table)) } + fn rename_table(&self, _name: &str, _new_name: String) -> catalog_error::Result { + todo!() + } + fn deregister_table(&self, name: &str) -> catalog::error::Result> { self.df_schema_provider .deregister_table(name) diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index d77cbdcef6f5..4d6a6b1b8746 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -70,14 +70,20 @@ pub struct AlterTableRequest { pub alter_kind: AlterKind, } +impl AlterTableRequest { + pub fn is_rename_table(&self) -> bool { + matches!(self.alter_kind, AlterKind::RenameTable { .. }) + } +} + /// Add column request -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct AddColumnRequest { pub column_schema: ColumnSchema, pub is_key: bool, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum AlterKind { AddColumns { columns: Vec }, DropColumns { names: Vec }, diff --git a/src/table/src/table.rs b/src/table/src/table.rs index 30c7b6cc503b..f440d6b9501d 100644 --- a/src/table/src/table.rs +++ b/src/table/src/table.rs @@ -77,7 +77,7 @@ pub trait Table: Send + Sync { } /// Alter table. - async fn alter(&self, _context: AlterContext, _request: AlterTableRequest) -> Result<()> { + async fn alter(&self, _context: AlterContext, _request: &AlterTableRequest) -> Result<()> { UnsupportedSnafu { operation: "ALTER TABLE", }