From bd05a024e3706badf642fcf10e7752581751ab54 Mon Sep 17 00:00:00 2001 From: elijah Date: Wed, 4 Jan 2023 10:50:59 +0800 Subject: [PATCH 01/16] feat: support renaming table in the catalog manger --- src/catalog/src/lib.rs | 27 ++++++++++++++++++++++++ src/catalog/src/local/manager.rs | 8 +++++++- src/catalog/src/local/memory.rs | 8 +++++++- src/catalog/src/remote/manager.rs | 7 ++++++- src/datanode/src/error.rs | 7 +++++++ src/datanode/src/sql/alter.rs | 34 +++++++++++++++++++++++-------- src/frontend/src/catalog.rs | 8 ++++++-- src/mito/src/engine.rs | 15 +++++++++++--- src/mito/src/table.rs | 2 +- src/table/src/requests.rs | 4 ++-- src/table/src/table.rs | 2 +- 11 files changed, 102 insertions(+), 20 deletions(-) diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index 5f1d2d89fb38..b1dbda2e160c 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -97,6 +97,10 @@ pub trait CatalogManager: CatalogList { /// schema registered. async fn register_schema(&self, request: RegisterSchemaRequest) -> Result; + /// Rename a table within given catalog/schema/table_name to catalog manager, + /// returns whether the table renamed. + async fn rename_table(&self, request: RenameTableRequest) -> Result; + /// Register a system table, should be called before starting the manager. async fn register_system_table(&self, request: RegisterSystemTableRequest) -> error::Result<()>; @@ -142,6 +146,29 @@ impl Debug for RegisterTableRequest { } } +#[derive(Clone)] +pub struct RenameTableRequest { + pub catalog: String, + pub schema: String, + pub table_name: String, + pub new_table_name: String, + pub table_id: TableId, + pub table: TableRef, +} + +impl Debug for RenameTableRequest { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + f.debug_struct("RenameTableRequest") + .field("catalog", &self.catalog) + .field("schema", &self.schema) + .field("table_name", &self.table_name) + .field("new_table_name", &self.new_table_name) + .field("table_id", &self.table_id) + .field("table", &self.table.table_info()) + .finish() + } +} + #[derive(Clone)] pub struct DeregisterTableRequest { pub catalog: String, diff --git a/src/catalog/src/local/manager.rs b/src/catalog/src/local/manager.rs index 375967dab870..154998bbab1f 100644 --- a/src/catalog/src/local/manager.rs +++ b/src/catalog/src/local/manager.rs @@ -47,7 +47,8 @@ use crate::tables::SystemCatalog; use crate::{ format_full_table_name, handle_system_table_request, CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, DeregisterTableRequest, RegisterSchemaRequest, - RegisterSystemTableRequest, RegisterTableRequest, SchemaProvider, SchemaProviderRef, + RegisterSystemTableRequest, RegisterTableRequest, RenameTableRequest, SchemaProvider, + SchemaProviderRef, }; /// A `CatalogManager` consists of a system catalog and a bunch of user catalogs. @@ -379,6 +380,11 @@ impl CatalogManager for LocalCatalogManager { } } + async fn rename_table(&self, _request: RenameTableRequest) -> Result { + // todo impl rename_table for catalog manager + todo!() + } + async fn deregister_table(&self, _request: DeregisterTableRequest) -> Result { UnimplementedSnafu { operation: "deregister table", diff --git a/src/catalog/src/local/memory.rs b/src/catalog/src/local/memory.rs index e4e1dc0da405..400ddaf47e7a 100644 --- a/src/catalog/src/local/memory.rs +++ b/src/catalog/src/local/memory.rs @@ -29,7 +29,8 @@ use crate::error::{CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExist use crate::schema::SchemaProvider; use crate::{ CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, DeregisterTableRequest, - RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, SchemaProviderRef, + RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, RenameTableRequest, + SchemaProviderRef, }; /// Simple in-memory list of catalogs @@ -89,6 +90,11 @@ impl CatalogManager for MemoryCatalogManager { .map(|v| v.is_none()) } + async fn rename_table(&self, _request: RenameTableRequest) -> Result { + // todo impl rename_table for catalog manager + todo!() + } + async fn deregister_table(&self, request: DeregisterTableRequest) -> Result { let catalogs = self.catalogs.write().unwrap(); let catalog = catalogs diff --git a/src/catalog/src/remote/manager.rs b/src/catalog/src/remote/manager.rs index 36659c5c04ab..1d8a4cfa7492 100644 --- a/src/catalog/src/remote/manager.rs +++ b/src/catalog/src/remote/manager.rs @@ -43,7 +43,7 @@ use crate::remote::{Kv, KvBackendRef}; use crate::{ handle_system_table_request, CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, DeregisterTableRequest, RegisterSchemaRequest, RegisterSystemTableRequest, - RegisterTableRequest, SchemaProvider, SchemaProviderRef, + RegisterTableRequest, RenameTableRequest, SchemaProvider, SchemaProviderRef, }; /// Catalog manager based on metasrv. @@ -449,6 +449,11 @@ impl CatalogManager for RemoteCatalogManager { Ok(true) } + async fn rename_table(&self, _request: RenameTableRequest) -> Result { + // todo impl rename_table for catalog manager + todo!() + } + async fn register_system_table(&self, request: RegisterSystemTableRequest) -> Result<()> { let mut requests = self.system_table_requests.lock().await; requests.push(request); diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 07114ea2a66d..827529ca6303 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -193,6 +193,12 @@ pub enum Error { source: catalog::error::Error, }, + #[snafu(display("Failed to rename table, source: {}", source))] + RenameTable { + #[snafu(backtrace)] + source: catalog::error::Error, + }, + #[snafu(display("Failed to register a new schema, source: {}", source))] RegisterSchema { #[snafu(backtrace)] @@ -337,6 +343,7 @@ impl ErrorExt for Error { | Error::StartGrpc { .. } | Error::CreateDir { .. } | Error::InsertSystemCatalog { .. } + | Error::RenameTable { .. } | Error::RegisterSchema { .. } | Error::Catalog { .. } | Error::MissingRequiredField { .. } diff --git a/src/datanode/src/sql/alter.rs b/src/datanode/src/sql/alter.rs index b9635ae2972f..5d5f321fe3d1 100644 --- a/src/datanode/src/sql/alter.rs +++ b/src/datanode/src/sql/alter.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use catalog::RenameTableRequest; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_query::Output; use snafu::prelude::*; @@ -28,11 +29,11 @@ impl SqlHandler { let ctx = EngineContext {}; let catalog_name = req.catalog_name.as_deref().unwrap_or(DEFAULT_CATALOG_NAME); let schema_name = req.schema_name.as_deref().unwrap_or(DEFAULT_SCHEMA_NAME); - let table_name = &req.table_name.to_string(); + let table_name = req.table_name.clone(); let table_ref = TableReference { catalog: catalog_name, schema: schema_name, - table: table_name, + table: &table_name, }; let full_table_name = table_ref.to_string(); @@ -43,12 +44,29 @@ impl SqlHandler { table_name: &full_table_name, } ); - self.table_engine - .alter_table(&ctx, req) - .await - .context(error::AlterTableSnafu { - table_name: full_table_name, - })?; + let kind = req.alter_kind.clone(); + let table = + self.table_engine + .alter_table(&ctx, req) + .await + .context(error::AlterTableSnafu { + table_name: full_table_name, + })?; + let table_info = &table.table_info(); + if let AlterKind::RenameTable { .. } = kind { + let rename_table_req = RenameTableRequest { + catalog: table_info.catalog_name.clone(), + schema: table_info.schema_name.clone(), + table_name, + new_table_name: table_info.name.clone(), + table_id: table_info.ident.table_id, + table, + }; + self.catalog_manager + .rename_table(rename_table_req) + .await + .context(error::RenameTableSnafu)?; + } // Tried in MySQL, it really prints "Affected Rows: 0". Ok(Output::AffectedRows(0)) } diff --git a/src/frontend/src/catalog.rs b/src/frontend/src/catalog.rs index e4cf05d249bd..93f8c66ca8d3 100644 --- a/src/frontend/src/catalog.rs +++ b/src/frontend/src/catalog.rs @@ -24,8 +24,8 @@ use catalog::helper::{ use catalog::remote::{Kv, KvBackendRef}; use catalog::{ CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, DeregisterTableRequest, - RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, SchemaProvider, - SchemaProviderRef, + RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, RenameTableRequest, + SchemaProvider, SchemaProviderRef, }; use futures::StreamExt; use meta_client::rpc::TableName; @@ -97,6 +97,10 @@ impl CatalogManager for FrontendCatalogManager { unimplemented!() } + async fn rename_table(&self, _request: RenameTableRequest) -> catalog_err::Result { + unimplemented!() + } + async fn register_system_table( &self, _request: RegisterSystemTableRequest, diff --git a/src/mito/src/engine.rs b/src/mito/src/engine.rs index 75845414d33b..e77b751a1bdf 100644 --- a/src/mito/src/engine.rs +++ b/src/mito/src/engine.rs @@ -29,7 +29,9 @@ use store_api::storage::{ }; use table::engine::{EngineContext, TableEngine, TableReference}; use table::metadata::{TableId, TableInfoBuilder, TableMetaBuilder, TableType, TableVersion}; -use table::requests::{AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest}; +use table::requests::{ + AlterKind, AlterTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest, +}; use table::table::{AlterContext, TableRef}; use table::{error as table_error, Result as TableResult, Table}; use tokio::sync::Mutex; @@ -491,7 +493,7 @@ impl MitoEngineInner { let schema_name = req.schema_name.as_deref().unwrap_or(DEFAULT_SCHEMA_NAME); let table_name = &req.table_name.clone(); - let table_ref = TableReference { + let mut table_ref = TableReference { catalog: catalog_name, schema: schema_name, table: table_name, @@ -502,9 +504,16 @@ impl MitoEngineInner { logging::info!("start altering table {} with request {:?}", table_name, req); table - .alter(AlterContext::new(), req) + .alter(AlterContext::new(), &req) .await .context(error::AlterTableSnafu { table_name })?; + + if let AlterKind::RenameTable { new_table_name } = &req.alter_kind { + table_ref.table = new_table_name.as_str(); + let mut tables = self.tables.write().unwrap(); + tables.remove(&table_ref.to_string()); + tables.insert(table_ref.to_string(), table.clone()); + } Ok(table) } diff --git a/src/mito/src/table.rs b/src/mito/src/table.rs index e60523a71baf..16fd8ce63524 100644 --- a/src/mito/src/table.rs +++ b/src/mito/src/table.rs @@ -168,7 +168,7 @@ impl Table for MitoTable { } /// Alter table changes the schemas of the table. - async fn alter(&self, _context: AlterContext, req: AlterTableRequest) -> TableResult<()> { + async fn alter(&self, _context: AlterContext, req: &AlterTableRequest) -> TableResult<()> { let _lock = self.alter_lock.lock().await; let table_info = self.table_info(); diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index d77cbdcef6f5..780d26788f80 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -71,13 +71,13 @@ pub struct AlterTableRequest { } /// Add column request -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct AddColumnRequest { pub column_schema: ColumnSchema, pub is_key: bool, } -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum AlterKind { AddColumns { columns: Vec }, DropColumns { names: Vec }, diff --git a/src/table/src/table.rs b/src/table/src/table.rs index 30c7b6cc503b..f440d6b9501d 100644 --- a/src/table/src/table.rs +++ b/src/table/src/table.rs @@ -77,7 +77,7 @@ pub trait Table: Send + Sync { } /// Alter table. - async fn alter(&self, _context: AlterContext, _request: AlterTableRequest) -> Result<()> { + async fn alter(&self, _context: AlterContext, _request: &AlterTableRequest) -> Result<()> { UnsupportedSnafu { operation: "ALTER TABLE", } From 3bf187d14ec5d037e067973ed7912c63843a1d03 Mon Sep 17 00:00:00 2001 From: elijah Date: Fri, 6 Jan 2023 12:31:08 +0800 Subject: [PATCH 02/16] feat: implement rename table for local catalog manager --- src/catalog/src/lib.rs | 5 + src/catalog/src/local/manager.rs | 37 ++++-- src/catalog/src/local/memory.rs | 132 +++++++++++++++++++- src/catalog/src/remote/manager.rs | 9 ++ src/catalog/src/schema.rs | 9 ++ src/catalog/src/tables.rs | 15 ++- src/catalog/tests/local_catalog_tests.rs | 39 +++++- src/datanode/src/sql.rs | 11 ++ src/frontend/src/catalog.rs | 9 ++ src/query/src/datafusion/catalog_adapter.rs | 9 ++ 10 files changed, 258 insertions(+), 17 deletions(-) diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index b1dbda2e160c..77d667d1ad70 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -187,6 +187,11 @@ pub fn format_full_table_name(catalog: &str, schema: &str, table: &str) -> Strin format!("{catalog}.{schema}.{table}") } +/// Formats table name by table id +pub fn format_full_table_name_by_id(catalog: &str, schema: &str, table_id: &TableId) -> String { + format!("{catalog}.{schema}.{table_id}") +} + pub trait CatalogProviderFactory { fn create(&self, catalog_name: String) -> CatalogProviderRef; } diff --git a/src/catalog/src/local/manager.rs b/src/catalog/src/local/manager.rs index 154998bbab1f..8c8e18b71c63 100644 --- a/src/catalog/src/local/manager.rs +++ b/src/catalog/src/local/manager.rs @@ -367,12 +367,7 @@ impl CatalogManager for LocalCatalogManager { } else { // table does not exist self.system - .register_table( - catalog_name.clone(), - schema_name.clone(), - request.table_name.clone(), - request.table_id, - ) + .register_table(catalog_name.clone(), schema_name.clone(), request.table_id) .await?; schema.register_table(request.table_name, request.table)?; Ok(true) @@ -380,9 +375,33 @@ impl CatalogManager for LocalCatalogManager { } } - async fn rename_table(&self, _request: RenameTableRequest) -> Result { - // todo impl rename_table for catalog manager - todo!() + async fn rename_table(&self, request: RenameTableRequest) -> Result { + let started = self.init_lock.lock().await; + + ensure!( + *started, + IllegalManagerStateSnafu { + msg: "Catalog manager not started", + } + ); + + let catalog_name = &request.catalog; + let schema_name = &request.schema; + + let catalog = self + .catalogs + .catalog(catalog_name)? + .context(CatalogNotFoundSnafu { catalog_name })?; + + let schema = catalog + .schema(schema_name)? + .with_context(|| SchemaNotFoundSnafu { + schema_info: format!("{catalog_name}.{schema_name}"), + })?; + + schema + .rename_table(&request.table_name, request.new_table_name, request.table) + .map(|v| v.is_none()) } async fn deregister_table(&self, _request: DeregisterTableRequest) -> Result { diff --git a/src/catalog/src/local/memory.rs b/src/catalog/src/local/memory.rs index 400ddaf47e7a..59aba513c475 100644 --- a/src/catalog/src/local/memory.rs +++ b/src/catalog/src/local/memory.rs @@ -25,7 +25,9 @@ use table::metadata::TableId; use table::table::TableIdProvider; use table::TableRef; -use crate::error::{CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu}; +use crate::error::{ + CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu, TableNotFoundSnafu, +}; use crate::schema::SchemaProvider; use crate::{ CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, DeregisterTableRequest, @@ -90,9 +92,22 @@ impl CatalogManager for MemoryCatalogManager { .map(|v| v.is_none()) } - async fn rename_table(&self, _request: RenameTableRequest) -> Result { - // todo impl rename_table for catalog manager - todo!() + async fn rename_table(&self, request: RenameTableRequest) -> Result { + let catalogs = self.catalogs.write().unwrap(); + let catalog = catalogs + .get(&request.catalog) + .context(CatalogNotFoundSnafu { + catalog_name: &request.catalog, + })? + .clone(); + let schema = catalog + .schema(&request.schema)? + .with_context(|| SchemaNotFoundSnafu { + schema_info: format!("{}.{}", &request.catalog, &request.schema), + })?; + schema + .rename_table(&request.table_name, request.new_table_name, request.table) + .map(|v| v.is_none()) } async fn deregister_table(&self, request: DeregisterTableRequest) -> Result { @@ -296,6 +311,33 @@ impl SchemaProvider for MemorySchemaProvider { } } + fn rename_table( + &self, + name: &str, + new_name: String, + table: TableRef, + ) -> Result> { + let mut tables = self.tables.write().unwrap(); + if let Some(existing) = tables.get(name) { + // if table with the same name but different table id exists, then it's a fatal bug + if existing.table_info().ident.table_id != table.table_info().ident.table_id { + error!( + "Unexpected table rename: {:?}, existing: {:?}", + table.table_info(), + existing.table_info() + ); + return TableExistsSnafu { table: name }.fail()?; + } + tables.remove(name); + Ok(tables.insert(new_name, table)) + } else { + TableNotFoundSnafu { + table_info: name.to_string(), + } + .fail()? + } + } + fn deregister_table(&self, name: &str) -> Result> { let mut tables = self.tables.write().unwrap(); Ok(tables.remove(name)) @@ -360,6 +402,88 @@ mod tests { assert_eq!(StatusCode::TableAlreadyExists, err.status_code()); } + #[tokio::test] + async fn test_mem_provider_rename_table() { + let provider = MemorySchemaProvider::new(); + let table_name = "num"; + assert!(!provider.table_exist(table_name).unwrap()); + let test_table: TableRef = Arc::new(NumbersTable::default()); + // register test table + assert!(provider + .register_table(table_name.to_string(), test_table.clone()) + .unwrap() + .is_none()); + assert!(provider.table_exist(table_name).unwrap()); + + // rename test table + let new_table_name = "numbers"; + assert!(provider + .rename_table(table_name, new_table_name.to_string(), test_table.clone(),) + .unwrap() + .is_none()); + + // test old table name not exist + assert!(!provider.table_exist(table_name).unwrap()); + assert!(provider.deregister_table(table_name).unwrap().is_none()); + + // test new table name exists + assert!(provider.table_exist(new_table_name).unwrap()); + let registered_table = provider.table(new_table_name).unwrap().unwrap(); + assert_eq!( + registered_table.table_info().ident.table_id, + test_table.table_info().ident.table_id + ); + + let other_table = Arc::new(NumbersTable::new(2)); + let result = provider.register_table(new_table_name.to_string(), other_table); + let err = result.err().unwrap(); + assert!(err.backtrace_opt().is_some()); + assert_eq!(StatusCode::TableAlreadyExists, err.status_code()); + } + + #[tokio::test] + async fn test_catalog_rename_table() { + let catalog = MemoryCatalogManager::default(); + let schema = catalog + .schema(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME) + .unwrap() + .unwrap(); + + // register table + let table_name = "num"; + let table_id = 2333; + let table: TableRef = Arc::new(NumbersTable::new(table_id)); + let register_table_req = RegisterTableRequest { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + table_name: table_name.to_string(), + table_id, + table: table.clone(), + }; + assert!(catalog.register_table(register_table_req).await.unwrap()); + assert!(schema.table_exist(table_name).unwrap()); + + // rename table + let new_table_name = "numbers"; + let rename_table_req = RenameTableRequest { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + table_name: table_name.to_string(), + new_table_name: new_table_name.to_string(), + table_id, + table: table.clone(), + }; + assert!(catalog.rename_table(rename_table_req).await.unwrap()); + assert!(!schema.table_exist(table_name).unwrap()); + assert!(schema.table_exist(new_table_name).unwrap()); + + let registered_table = catalog + .table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, new_table_name) + .unwrap() + .unwrap(); + assert_eq!(registered_table.table_info().ident.table_id, table_id); + } + #[test] pub fn test_register_if_absent() { let list = MemoryCatalogManager::default(); diff --git a/src/catalog/src/remote/manager.rs b/src/catalog/src/remote/manager.rs index 1d8a4cfa7492..ab247b6c7255 100644 --- a/src/catalog/src/remote/manager.rs +++ b/src/catalog/src/remote/manager.rs @@ -744,6 +744,15 @@ impl SchemaProvider for RemoteSchemaProvider { prev } + fn rename_table( + &self, + _name: &str, + _new_name: String, + _table: TableRef, + ) -> Result> { + todo!() + } + fn deregister_table(&self, name: &str) -> Result> { let table_name = name.to_string(); let table_key = self.build_regional_table_key(&table_name).to_string(); diff --git a/src/catalog/src/schema.rs b/src/catalog/src/schema.rs index 3f91f8d81b12..6d24d64a5a83 100644 --- a/src/catalog/src/schema.rs +++ b/src/catalog/src/schema.rs @@ -35,6 +35,15 @@ pub trait SchemaProvider: Sync + Send { /// If a table of the same name existed before, it returns "Table already exists" error. fn register_table(&self, name: String, table: TableRef) -> Result>; + /// If supported by the implementation, renames an existing table from this schema and returns it. + /// If no table of that name exists, returns "Table not found" error. + fn rename_table( + &self, + name: &str, + new_name: String, + table: TableRef, + ) -> Result>; + /// If supported by the implementation, removes an existing table from this schema and returns it. /// If no table of that name exists, returns Ok(None). fn deregister_table(&self, name: &str) -> Result>; diff --git a/src/catalog/src/tables.rs b/src/catalog/src/tables.rs index 1485c51e1d7b..65100d6a82d8 100644 --- a/src/catalog/src/tables.rs +++ b/src/catalog/src/tables.rs @@ -41,7 +41,8 @@ use table::{Table, TableRef}; use crate::error::{Error, InsertCatalogRecordSnafu}; use crate::system::{build_schema_insert_request, build_table_insert_request, SystemCatalogTable}; use crate::{ - format_full_table_name, CatalogListRef, CatalogProvider, SchemaProvider, SchemaProviderRef, + format_full_table_name_by_id, CatalogListRef, CatalogProvider, SchemaProvider, + SchemaProviderRef, }; /// Tables holds all tables created by user. @@ -233,6 +234,15 @@ impl SchemaProvider for InformationSchema { panic!("System catalog & schema does not support register table") } + fn rename_table( + &self, + _name: &str, + _new_name: String, + _table: TableRef, + ) -> crate::error::Result> { + panic!("System catalog & schema does not support rename table") + } + fn deregister_table(&self, _name: &str) -> crate::error::Result> { panic!("System catalog & schema does not support deregister table") } @@ -266,10 +276,9 @@ impl SystemCatalog { &self, catalog: String, schema: String, - table_name: String, table_id: TableId, ) -> crate::error::Result { - let full_table_name = format_full_table_name(&catalog, &schema, &table_name); + let full_table_name = format_full_table_name_by_id(&catalog, &schema, &table_id); let request = build_table_insert_request(full_table_name, table_id); self.information_schema .system diff --git a/src/catalog/tests/local_catalog_tests.rs b/src/catalog/tests/local_catalog_tests.rs index f40639ed4bc6..b1f54b414e26 100644 --- a/src/catalog/tests/local_catalog_tests.rs +++ b/src/catalog/tests/local_catalog_tests.rs @@ -17,7 +17,7 @@ mod tests { use std::sync::Arc; use catalog::local::LocalCatalogManager; - use catalog::{CatalogManager, RegisterTableRequest}; + use catalog::{CatalogManager, RegisterTableRequest, RenameTableRequest}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_telemetry::{error, info}; use mito::config::EngineConfig; @@ -38,6 +38,43 @@ mod tests { Ok(catalog_manager) } + #[tokio::test] + async fn test_rename_table() { + let catalog_manager = create_local_catalog_manager().await.unwrap(); + // register table + let table_name = "test_table"; + let table_id = 42; + let table = Arc::new(NumbersTable::new(table_id)); + let request = RegisterTableRequest { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + table_name: table_name.to_string(), + table_id, + table: table.clone(), + }; + assert!(catalog_manager.register_table(request).await.unwrap()); + + let new_table_name = "table_t"; + let rename_table_req = RenameTableRequest { + catalog: DEFAULT_CATALOG_NAME.to_string(), + schema: DEFAULT_SCHEMA_NAME.to_string(), + table_name: table_name.to_string(), + new_table_name: new_table_name.to_string(), + table_id, + table: table.clone(), + }; + assert!(catalog_manager + .rename_table(rename_table_req) + .await + .unwrap()); + + let registered_table = catalog_manager + .table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, new_table_name) + .unwrap() + .unwrap(); + assert_eq!(registered_table.table_info().ident.table_id, table_id); + } + #[tokio::test] async fn test_duplicate_register() { let catalog_manager = create_local_catalog_manager().await.unwrap(); diff --git a/src/datanode/src/sql.rs b/src/datanode/src/sql.rs index 8f618a23b1ce..760ad4c59da7 100644 --- a/src/datanode/src/sql.rs +++ b/src/datanode/src/sql.rs @@ -208,9 +208,20 @@ mod tests { ) -> catalog::error::Result> { unimplemented!(); } + + fn rename_table( + &self, + _name: &str, + _new_name: String, + _table: TableRef, + ) -> catalog::error::Result> { + unimplemented!() + } + fn deregister_table(&self, _name: &str) -> catalog::error::Result> { unimplemented!(); } + fn table_exist(&self, name: &str) -> catalog::error::Result { Ok(name == "demo") } diff --git a/src/frontend/src/catalog.rs b/src/frontend/src/catalog.rs index 93f8c66ca8d3..3681a36f57ac 100644 --- a/src/frontend/src/catalog.rs +++ b/src/frontend/src/catalog.rs @@ -326,6 +326,15 @@ impl SchemaProvider for FrontendSchemaProvider { unimplemented!("Frontend schema provider does not support register table") } + fn rename_table( + &self, + _name: &str, + _new_name: String, + _table: TableRef, + ) -> catalog_err::Result> { + unimplemented!("Frontend schema provider does not support rename table") + } + fn deregister_table(&self, _name: &str) -> catalog::error::Result> { unimplemented!("Frontend schema provider does not support deregister table") } diff --git a/src/query/src/datafusion/catalog_adapter.rs b/src/query/src/datafusion/catalog_adapter.rs index ce16df8c6516..3a176154a566 100644 --- a/src/query/src/datafusion/catalog_adapter.rs +++ b/src/query/src/datafusion/catalog_adapter.rs @@ -231,6 +231,15 @@ impl SchemaProvider for SchemaProviderAdapter { .map(|_| table)) } + fn rename_table( + &self, + _name: &str, + _new_name: String, + _table: TableRef, + ) -> catalog_error::Result> { + todo!() + } + fn deregister_table(&self, name: &str) -> catalog::error::Result> { self.df_schema_provider .deregister_table(name) From 7822dc0001e6da7d365fe1bcc2e6c43e18b69b46 Mon Sep 17 00:00:00 2001 From: elijah Date: Fri, 6 Jan 2023 13:00:39 +0800 Subject: [PATCH 03/16] chore: fmt code --- src/catalog/src/local/manager.rs | 3 ++- src/catalog/src/local/memory.rs | 3 ++- src/frontend/src/instance/distributed.rs | 2 +- src/frontend/src/table.rs | 4 ++-- 4 files changed, 7 insertions(+), 5 deletions(-) diff --git a/src/catalog/src/local/manager.rs b/src/catalog/src/local/manager.rs index 8c8e18b71c63..f299c4af93a1 100644 --- a/src/catalog/src/local/manager.rs +++ b/src/catalog/src/local/manager.rs @@ -396,7 +396,8 @@ impl CatalogManager for LocalCatalogManager { let schema = catalog .schema(schema_name)? .with_context(|| SchemaNotFoundSnafu { - schema_info: format!("{catalog_name}.{schema_name}"), + catalog: catalog_name, + schema: schema_name, })?; schema diff --git a/src/catalog/src/local/memory.rs b/src/catalog/src/local/memory.rs index 59aba513c475..0627e42e5a46 100644 --- a/src/catalog/src/local/memory.rs +++ b/src/catalog/src/local/memory.rs @@ -103,7 +103,8 @@ impl CatalogManager for MemoryCatalogManager { let schema = catalog .schema(&request.schema)? .with_context(|| SchemaNotFoundSnafu { - schema_info: format!("{}.{}", &request.catalog, &request.schema), + catalog: &request.catalog, + schema: &request.schema, })?; schema .rename_table(&request.table_name, request.new_table_name, request.table) diff --git a/src/frontend/src/instance/distributed.rs b/src/frontend/src/instance/distributed.rs index 4e8c333a78de..5ce4ff932194 100644 --- a/src/frontend/src/instance/distributed.rs +++ b/src/frontend/src/instance/distributed.rs @@ -272,7 +272,7 @@ impl DistInstance { let mut context = AlterContext::with_capacity(1); context.insert(expr); - table.alter(context, request).await.context(TableSnafu)?; + table.alter(context, &request).await.context(TableSnafu)?; Ok(Output::AffectedRows(0)) } diff --git a/src/frontend/src/table.rs b/src/frontend/src/table.rs index a02d02752b31..c8919bbb8ee9 100644 --- a/src/frontend/src/table.rs +++ b/src/frontend/src/table.rs @@ -163,7 +163,7 @@ impl Table for DistTable { Ok(FilterPushDownType::Inexact) } - async fn alter(&self, context: AlterContext, request: AlterTableRequest) -> table::Result<()> { + async fn alter(&self, context: AlterContext, request: &AlterTableRequest) -> table::Result<()> { self.handle_alter(context, request) .await .map_err(BoxedError::new) @@ -414,7 +414,7 @@ impl DistTable { .context(CatalogSnafu) } - async fn handle_alter(&self, context: AlterContext, request: AlterTableRequest) -> Result<()> { + async fn handle_alter(&self, context: AlterContext, request: &AlterTableRequest) -> Result<()> { let alter_expr = context .get::() .context(ContextValueNotFoundSnafu { key: "AlterExpr" })?; From 7528a4b065c4f45277d5fe8be35449b93b199150 Mon Sep 17 00:00:00 2001 From: elijah Date: Sat, 7 Jan 2023 16:12:18 +0800 Subject: [PATCH 04/16] fix: update system catalog when renaming table in local catalog manager --- src/catalog/src/error.rs | 10 +++ src/catalog/src/lib.rs | 5 -- src/catalog/src/local/manager.rs | 16 ++++- src/catalog/src/system.rs | 56 +++++++++++++++-- src/catalog/src/tables.rs | 79 ++++++++++++++++++++++-- src/catalog/tests/local_catalog_tests.rs | 2 + 6 files changed, 152 insertions(+), 16 deletions(-) diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs index a2b1657642bd..89c2fa379a6c 100644 --- a/src/catalog/src/error.rs +++ b/src/catalog/src/error.rs @@ -142,6 +142,15 @@ pub enum Error { source: table::error::Error, }, + #[snafu(display( + "Failed to delete table creation record to system catalog, source: {}", + source + ))] + DeleteCatalogRecord { + #[snafu(backtrace)] + source: table::error::Error, + }, + #[snafu(display("Illegal catalog manager state: {}", msg))] IllegalManagerState { backtrace: Backtrace, msg: String }, @@ -237,6 +246,7 @@ impl ErrorExt for Error { Error::OpenSystemCatalog { source, .. } | Error::CreateSystemCatalog { source, .. } | Error::InsertCatalogRecord { source, .. } + | Error::DeleteCatalogRecord { source, .. } | Error::OpenTable { source, .. } | Error::CreateTable { source, .. } => source.status_code(), Error::MetaSrv { source, .. } => source.status_code(), diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index 77d667d1ad70..b1dbda2e160c 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -187,11 +187,6 @@ pub fn format_full_table_name(catalog: &str, schema: &str, table: &str) -> Strin format!("{catalog}.{schema}.{table}") } -/// Formats table name by table id -pub fn format_full_table_name_by_id(catalog: &str, schema: &str, table_id: &TableId) -> String { - format!("{catalog}.{schema}.{table_id}") -} - pub trait CatalogProviderFactory { fn create(&self, catalog_name: String) -> CatalogProviderRef; } diff --git a/src/catalog/src/local/manager.rs b/src/catalog/src/local/manager.rs index f299c4af93a1..6682864e4d13 100644 --- a/src/catalog/src/local/manager.rs +++ b/src/catalog/src/local/manager.rs @@ -367,7 +367,12 @@ impl CatalogManager for LocalCatalogManager { } else { // table does not exist self.system - .register_table(catalog_name.clone(), schema_name.clone(), request.table_id) + .register_table( + catalog_name.clone(), + schema_name.clone(), + request.table_name.clone(), + request.table_id, + ) .await?; schema.register_table(request.table_name, request.table)?; Ok(true) @@ -400,6 +405,15 @@ impl CatalogManager for LocalCatalogManager { schema: schema_name, })?; + self.system + .rename_table( + catalog_name.clone(), + schema_name.clone(), + request.table_name.clone(), + request.new_table_name.clone(), + request.table_id, + ) + .await?; schema .rename_table(&request.table_name, request.new_table_name, request.table) .map(|v| v.is_none()) diff --git a/src/catalog/src/system.rs b/src/catalog/src/system.rs index d135365c4ce4..8a45ab8db439 100644 --- a/src/catalog/src/system.rs +++ b/src/catalog/src/system.rs @@ -32,7 +32,7 @@ use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; use table::engine::{EngineContext, TableEngineRef}; use table::metadata::{TableId, TableInfoRef}; -use table::requests::{CreateTableRequest, InsertRequest, OpenTableRequest}; +use table::requests::{CreateTableRequest, DeleteRequest, InsertRequest, OpenTableRequest}; use table::{Table, TableRef}; use crate::error::{ @@ -73,6 +73,11 @@ impl Table for SystemCatalogTable { self.table.insert(request).await } + /// Delete row from table + async fn delete(&self, request: DeleteRequest) -> table::Result { + self.table.delete(request).await + } + fn table_info(&self) -> TableInfoRef { self.table_info.clone() } @@ -168,17 +173,17 @@ fn build_system_catalog_schema() -> Schema { ColumnSchema::new( "value".to_string(), ConcreteDataType::binary_datatype(), - false, + true, ), ColumnSchema::new( "gmt_created".to_string(), ConcreteDataType::timestamp_millisecond_datatype(), - false, + true, ), ColumnSchema::new( "gmt_modified".to_string(), ConcreteDataType::timestamp_millisecond_datatype(), - false, + true, ), ]; @@ -207,6 +212,27 @@ pub fn build_schema_insert_request(catalog_name: String, schema_name: String) -> ) } +pub fn build_table_delete_request(full_table_name: String) -> DeleteRequest { + build_delete_request(EntryType::Table, full_table_name.as_bytes()) +} + +pub fn build_delete_request(entry_type: EntryType, key: &[u8]) -> DeleteRequest { + let mut key_column_values = HashMap::with_capacity(3); + key_column_values.insert( + "entry_type".to_string(), + Arc::new(UInt8Vector::from_slice(&[entry_type as u8])) as _, + ); + key_column_values.insert( + "key".to_string(), + Arc::new(BinaryVector::from_slice(&[key])) as _, + ); + key_column_values.insert( + "timestamp".to_string(), + Arc::new(TimestampMillisecondVector::from_slice(&[0])) as _, + ); + DeleteRequest { key_column_values } +} + pub fn build_insert_request(entry_type: EntryType, key: &[u8], value: &[u8]) -> InsertRequest { let mut columns_values = HashMap::with_capacity(6); columns_values.insert( @@ -378,6 +404,8 @@ mod tests { use tempdir::TempDir; use super::*; + use crate::error::DeleteCatalogRecordSnafu; + use crate::format_full_table_name; #[test] pub fn test_decode_catalog_entry() { @@ -487,4 +515,24 @@ mod tests { assert_eq!(SYSTEM_CATALOG_NAME, info.catalog_name); assert_eq!(INFORMATION_SCHEMA_NAME, info.schema_name); } + + #[tokio::test] + async fn test_system_table_delete() { + let (_dir, table_engine) = prepare_table_engine().await; + let system_table = SystemCatalogTable::new(table_engine).await.unwrap(); + let table_name = "test_table"; + let table_id = 42; + let full_table_name = + format_full_table_name(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name); + let insert_req = build_table_insert_request(full_table_name.clone(), table_id); + + assert!(system_table.insert(insert_req).await.is_ok()); + + let delete_req = build_table_delete_request(full_table_name); + let ret = system_table + .delete(delete_req) + .await + .context(DeleteCatalogRecordSnafu); + assert!(ret.is_ok()); + } } diff --git a/src/catalog/src/tables.rs b/src/catalog/src/tables.rs index 65100d6a82d8..38ebb787f2eb 100644 --- a/src/catalog/src/tables.rs +++ b/src/catalog/src/tables.rs @@ -38,11 +38,13 @@ use table::metadata::{TableId, TableInfoRef}; use table::table::scan::SimpleTableScan; use table::{Table, TableRef}; -use crate::error::{Error, InsertCatalogRecordSnafu}; -use crate::system::{build_schema_insert_request, build_table_insert_request, SystemCatalogTable}; +use crate::error::{DeleteCatalogRecordSnafu, Error, InsertCatalogRecordSnafu}; +use crate::system::{ + build_schema_insert_request, build_table_delete_request, build_table_insert_request, + SystemCatalogTable, +}; use crate::{ - format_full_table_name_by_id, CatalogListRef, CatalogProvider, SchemaProvider, - SchemaProviderRef, + format_full_table_name, CatalogListRef, CatalogProvider, SchemaProvider, SchemaProviderRef, }; /// Tables holds all tables created by user. @@ -276,9 +278,10 @@ impl SystemCatalog { &self, catalog: String, schema: String, + table_name: String, table_id: TableId, ) -> crate::error::Result { - let full_table_name = format_full_table_name_by_id(&catalog, &schema, &table_id); + let full_table_name = format_full_table_name(&catalog, &schema, &table_name); let request = build_table_insert_request(full_table_name, table_id); self.information_schema .system @@ -287,6 +290,26 @@ impl SystemCatalog { .context(InsertCatalogRecordSnafu) } + pub async fn rename_table( + &self, + catalog: String, + schema: String, + table_name: String, + new_table_name: String, + table_id: TableId, + ) -> crate::error::Result { + let full_table_name = format_full_table_name(&catalog, &schema, &table_name); + let delete_table_req = build_table_delete_request(full_table_name); + self.information_schema + .system + .delete(delete_table_req) + .await + .context(DeleteCatalogRecordSnafu)?; + + self.register_table(catalog, schema, new_table_name, table_id) + .await + } + pub async fn register_schema( &self, catalog: String, @@ -358,11 +381,13 @@ mod tests { use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_query::physical_plan::SessionContext; use futures_util::StreamExt; + use mito::config::EngineConfig; + use mito::table::test_util::{new_test_object_store, MockEngine, MockMitoEngine}; use table::table::numbers::NumbersTable; use super::*; use crate::local::memory::new_memory_catalog_list; - use crate::CatalogList; + use crate::{error, CatalogList}; #[tokio::test] async fn test_tables() { @@ -426,4 +451,46 @@ mod tests { panic!("Record batch should not be empty!") } } + + async fn create_system_catalog() -> Result { + let (_dir, object_store) = new_test_object_store("setup_mock_engine_and_table").await; + let mock_engine = Arc::new(MockMitoEngine::new( + EngineConfig::default(), + MockEngine::default(), + object_store, + )); + let table = SystemCatalogTable::new(mock_engine.clone()).await?; + let memory_catalog_list = new_memory_catalog_list().unwrap(); + Ok(SystemCatalog::new(table, memory_catalog_list, mock_engine)) + } + + #[tokio::test] + async fn test_rename_table() { + let system_catalog = create_system_catalog().await.unwrap(); + let catalog = DEFAULT_CATALOG_NAME.to_string(); + let schema = DEFAULT_SCHEMA_NAME.to_string(); + let table_name = "test_table"; + let table_id = 42; + assert!(system_catalog + .register_table( + catalog.clone(), + schema.clone(), + table_name.to_string(), + table_id + ) + .await + .is_ok()); + + let new_table_name = "demo"; + let ret = system_catalog + .rename_table( + catalog.clone(), + schema.clone(), + table_name.to_string(), + new_table_name.to_string(), + table_id, + ) + .await; + assert!(ret.is_ok()); + } } diff --git a/src/catalog/tests/local_catalog_tests.rs b/src/catalog/tests/local_catalog_tests.rs index b1f54b414e26..8c7f31c00570 100644 --- a/src/catalog/tests/local_catalog_tests.rs +++ b/src/catalog/tests/local_catalog_tests.rs @@ -40,6 +40,7 @@ mod tests { #[tokio::test] async fn test_rename_table() { + common_telemetry::init_default_ut_logging(); let catalog_manager = create_local_catalog_manager().await.unwrap(); // register table let table_name = "test_table"; @@ -54,6 +55,7 @@ mod tests { }; assert!(catalog_manager.register_table(request).await.unwrap()); + // rename table let new_table_name = "table_t"; let rename_table_req = RenameTableRequest { catalog: DEFAULT_CATALOG_NAME.to_string(), From 671623c521bf79a3f81b146a15a1781e38ae93b3 Mon Sep 17 00:00:00 2001 From: elijah Date: Sat, 7 Jan 2023 17:48:54 +0800 Subject: [PATCH 05/16] chore: add instance test for rename table --- src/datanode/src/mock.rs | 38 +++++++--- src/datanode/src/tests/instance_test.rs | 98 +++++++++++++++++++++++++ src/datanode/src/tests/test_util.rs | 4 + 3 files changed, 129 insertions(+), 11 deletions(-) diff --git a/src/datanode/src/mock.rs b/src/datanode/src/mock.rs index 9fa2b9bfe345..3d508ad1a3a4 100644 --- a/src/datanode/src/mock.rs +++ b/src/datanode/src/mock.rs @@ -16,18 +16,21 @@ use std::sync::atomic::{AtomicU32, Ordering}; use std::sync::Arc; use catalog::remote::MetaKvBackend; +use catalog::CatalogManagerRef; use common_catalog::consts::MIN_USER_TABLE_ID; use meta_client::client::{MetaClient, MetaClientBuilder}; use meta_srv::mocks::MockInfo; use mito::config::EngineConfig as TableEngineConfig; use query::QueryEngineFactory; +use servers::Mode; +use snafu::ResultExt; use storage::config::EngineConfig as StorageEngineConfig; use storage::EngineImpl; use table::metadata::TableId; use table::table::TableIdProvider; use crate::datanode::DatanodeOptions; -use crate::error::Result; +use crate::error::{CatalogSnafu, Result}; use crate::heartbeat::HeartbeatTask; use crate::instance::{create_log_store, new_object_store, DefaultEngine, Instance}; use crate::script::ScriptExecutor; @@ -53,16 +56,29 @@ impl Instance { object_store, )); - // create remote catalog manager - let catalog_manager = Arc::new(catalog::remote::RemoteCatalogManager::new( - table_engine.clone(), - opts.node_id.unwrap_or(42), - Arc::new(MetaKvBackend { - client: meta_client.clone(), - }), - )); - - let factory = QueryEngineFactory::new(catalog_manager.clone()); + // By default, catalog manager and factory are created in distributed mode + let (catalog_manager, factory) = match opts.mode { + Mode::Standalone => { + let catalog = Arc::new( + catalog::local::LocalCatalogManager::try_new(table_engine.clone()) + .await + .context(CatalogSnafu)?, + ); + let factory = QueryEngineFactory::new(catalog.clone()); + (catalog as CatalogManagerRef, factory) + } + Mode::Distributed => { + let catalog = Arc::new(catalog::remote::RemoteCatalogManager::new( + table_engine.clone(), + opts.node_id.unwrap_or(42), + Arc::new(MetaKvBackend { + client: meta_client.clone(), + }), + )); + let factory = QueryEngineFactory::new(catalog.clone()); + (catalog as CatalogManagerRef, factory) + } + }; let query_engine = factory.query_engine(); let script_executor = ScriptExecutor::new(catalog_manager.clone(), query_engine.clone()).await?; diff --git a/src/datanode/src/tests/instance_test.rs b/src/datanode/src/tests/instance_test.rs index 215cf75a6d87..1fbc5723b3e5 100644 --- a/src/datanode/src/tests/instance_test.rs +++ b/src/datanode/src/tests/instance_test.rs @@ -342,6 +342,104 @@ pub async fn test_execute_create() { assert!(matches!(output, Output::AffectedRows(0))); } +#[tokio::test] +async fn test_rename_table() { + let instance = MockInstance::new("test_rename_table_local").await; + + let output = execute_sql(&instance, "create database db").await; + assert!(matches!(output, Output::AffectedRows(1))); + + let output = execute_sql_in_db( + &instance, + "create table demo(host string, cpu double, memory double, ts timestamp, time index(ts))", + "db", + ) + .await; + assert!(matches!(output, Output::AffectedRows(0))); + + let output = execute_sql_in_db(&instance, "show tables", "db").await; + let expect = "\ ++--------+ +| Tables | ++--------+ +| demo | ++--------+\ +" + .to_string(); + check_output_stream(output, expect).await; + + // make sure table insertion is ok before altering table name + let output = execute_sql_in_db( + &instance, + "insert into demo(host, cpu, memory, ts) values ('host1', 1.1, 100, 1000)", + "db", + ) + .await; + assert!(matches!(output, Output::AffectedRows(1))); + + // rename table + let output = execute_sql_in_db(&instance, "alter table demo rename test_table", "db").await; + assert!(matches!(output, Output::AffectedRows(0))); + + let output = execute_sql_in_db(&instance, "show tables", "db").await; + let expect = "\ ++------------+ +| Tables | ++------------+ +| test_table | ++------------+\ +" + .to_string(); + check_output_stream(output, expect).await; + + // make sure table insertion is ok after altered table name + let output = execute_sql_in_db( + &instance, + "insert into test_table(host, cpu, memory, ts) values ('host2', 2.2, 200, 2000)", + "db", + ) + .await; + assert!(matches!(output, Output::AffectedRows(1))); + + let output = execute_sql_in_db(&instance, "select * from test_table order by ts", "db").await; + let expected = "\ ++-------+-----+--------+---------------------+ +| host | cpu | memory | ts | ++-------+-----+--------+---------------------+ +| host1 | 1.1 | 100 | 1970-01-01T00:00:01 | +| host2 | 2.2 | 200 | 1970-01-01T00:00:02 | ++-------+-----+--------+---------------------+\ +" + .to_string(); + check_output_stream(output, expected).await; + + // restart instance + assert!(instance.restart().await.is_ok()); + + let output = execute_sql_in_db(&instance, "show tables", "db").await; + let expect = "\ ++------------+ +| Tables | ++------------+ +| test_table | ++------------+\ +" + .to_string(); + check_output_stream(output, expect).await; + + let output = execute_sql_in_db(&instance, "select * from test_table order by ts", "db").await; + let expected = "\ ++-------+-----+--------+---------------------+ +| host | cpu | memory | ts | ++-------+-----+--------+---------------------+ +| host1 | 1.1 | 100 | 1970-01-01T00:00:01 | +| host2 | 2.2 | 200 | 1970-01-01T00:00:02 | ++-------+-----+--------+---------------------+\ +" + .to_string(); + check_output_stream(output, expected).await; +} + #[tokio::test(flavor = "multi_thread")] async fn test_alter_table() { let instance = setup_test_instance("test_alter_table").await; diff --git a/src/datanode/src/tests/test_util.rs b/src/datanode/src/tests/test_util.rs index d21603d0e0c3..751c6d0cbaaa 100644 --- a/src/datanode/src/tests/test_util.rs +++ b/src/datanode/src/tests/test_util.rs @@ -52,6 +52,10 @@ impl MockInstance { pub(crate) fn inner(&self) -> &Instance { &self.instance } + + pub(crate) async fn restart(&self) -> Result<()> { + self.instance.start().await + } } struct TestGuard { From f92643dd26d5fc58c360ab46d4b7517b630d135e Mon Sep 17 00:00:00 2001 From: elijah Date: Sat, 7 Jan 2023 18:17:50 +0800 Subject: [PATCH 06/16] chore: fix frontend test --- src/catalog/src/tables.rs | 2 +- src/catalog/tests/local_catalog_tests.rs | 14 ++++++++++++++ src/frontend/src/tests.rs | 1 + 3 files changed, 16 insertions(+), 1 deletion(-) diff --git a/src/catalog/src/tables.rs b/src/catalog/src/tables.rs index 38ebb787f2eb..9e64e51e7c42 100644 --- a/src/catalog/src/tables.rs +++ b/src/catalog/src/tables.rs @@ -387,7 +387,7 @@ mod tests { use super::*; use crate::local::memory::new_memory_catalog_list; - use crate::{error, CatalogList}; + use crate::CatalogList; #[tokio::test] async fn test_tables() { diff --git a/src/catalog/tests/local_catalog_tests.rs b/src/catalog/tests/local_catalog_tests.rs index 8c7f31c00570..eaab8606ade7 100644 --- a/src/catalog/tests/local_catalog_tests.rs +++ b/src/catalog/tests/local_catalog_tests.rs @@ -75,6 +75,20 @@ mod tests { .unwrap() .unwrap(); assert_eq!(registered_table.table_info().ident.table_id, table_id); + + // restart catalog manager + assert!(catalog_manager.start().await.is_ok()); + + // check table + let table = catalog_manager + .table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, new_table_name) + .unwrap() + .unwrap(); + assert_eq!(table.table_info().ident.table_id, table_id); + let opt = catalog_manager + .table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name) + .unwrap(); + assert!(opt.is_none()); } #[tokio::test] diff --git a/src/frontend/src/tests.rs b/src/frontend/src/tests.rs index 80650d25e229..bd0f83f778d7 100644 --- a/src/frontend/src/tests.rs +++ b/src/frontend/src/tests.rs @@ -165,6 +165,7 @@ async fn create_distributed_datanode( storage: ObjectStoreConfig::File { data_dir: data_tmp_dir.path().to_str().unwrap().to_string(), }, + mode: Mode::Distributed, ..Default::default() }; From f502b0ef1d34dbd9a61d9d550e0334e13e4ddd50 Mon Sep 17 00:00:00 2001 From: elijah Date: Sat, 7 Jan 2023 18:31:50 +0800 Subject: [PATCH 07/16] chore: fix comment --- src/datanode/src/mock.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/datanode/src/mock.rs b/src/datanode/src/mock.rs index 3d508ad1a3a4..ec4304e53afd 100644 --- a/src/datanode/src/mock.rs +++ b/src/datanode/src/mock.rs @@ -56,7 +56,7 @@ impl Instance { object_store, )); - // By default, catalog manager and factory are created in distributed mode + // By default, catalog manager and factory are created in standalone mode let (catalog_manager, factory) = match opts.mode { Mode::Standalone => { let catalog = Arc::new( From 3a3707b338b6d15f15eb20be51c456ccdb4ff52a Mon Sep 17 00:00:00 2001 From: elijah Date: Sat, 7 Jan 2023 21:50:55 +0800 Subject: [PATCH 08/16] chore: fix rename table test --- src/catalog/tests/local_catalog_tests.rs | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/src/catalog/tests/local_catalog_tests.rs b/src/catalog/tests/local_catalog_tests.rs index eaab8606ade7..8c7f31c00570 100644 --- a/src/catalog/tests/local_catalog_tests.rs +++ b/src/catalog/tests/local_catalog_tests.rs @@ -75,20 +75,6 @@ mod tests { .unwrap() .unwrap(); assert_eq!(registered_table.table_info().ident.table_id, table_id); - - // restart catalog manager - assert!(catalog_manager.start().await.is_ok()); - - // check table - let table = catalog_manager - .table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, new_table_name) - .unwrap() - .unwrap(); - assert_eq!(table.table_info().ident.table_id, table_id); - let opt = catalog_manager - .table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name) - .unwrap(); - assert!(opt.is_none()); } #[tokio::test] From 37e8e0a4003d002293313e375fdb4d3888735aeb Mon Sep 17 00:00:00 2001 From: elijah Date: Tue, 10 Jan 2023 01:14:00 +0800 Subject: [PATCH 09/16] fix: renaming a table with an existing name --- src/mito/src/engine.rs | 52 +++++++++++++++++++++++++++++++++++++++--- 1 file changed, 49 insertions(+), 3 deletions(-) diff --git a/src/mito/src/engine.rs b/src/mito/src/engine.rs index e77b751a1bdf..52b64704a275 100644 --- a/src/mito/src/engine.rs +++ b/src/mito/src/engine.rs @@ -493,6 +493,21 @@ impl MitoEngineInner { let schema_name = req.schema_name.as_deref().unwrap_or(DEFAULT_SCHEMA_NAME); let table_name = &req.table_name.clone(); + if let AlterKind::RenameTable { new_table_name } = &req.alter_kind { + let table_ref = TableReference { + catalog: catalog_name, + schema: schema_name, + table: new_table_name, + }; + + if self.get_table(&table_ref).is_some() { + return TableExistsSnafu { + table_name: table_ref.to_string(), + } + .fail(); + } + } + let mut table_ref = TableReference { catalog: catalog_name, schema: schema_name, @@ -565,7 +580,7 @@ mod tests { use super::*; use crate::table::test_util; - use crate::table::test_util::{new_insert_request, MockRegion, TABLE_NAME}; + use crate::table::test_util::{new_insert_request, schema_for_test, MockRegion, TABLE_NAME}; async fn setup_table_with_column_default_constraint() -> (TempDir, String, TableRef) { let table_name = "test_default_constraint"; @@ -1079,8 +1094,40 @@ mod tests { async fn test_alter_rename_table() { let (engine, table_engine, _table, object_store, _dir) = test_util::setup_mock_engine_and_table().await; + let ctx = EngineContext::default(); + + // register another table + let another_name = "another_table"; + let req = CreateTableRequest { + id: 1024, + catalog_name: DEFAULT_CATALOG_NAME.to_string(), + schema_name: DEFAULT_SCHEMA_NAME.to_string(), + table_name: another_name.to_string(), + desc: Some("another test table".to_string()), + schema: Arc::new(schema_for_test()), + region_numbers: vec![0], + primary_key_indices: vec![0], + create_if_not_exists: true, + table_options: HashMap::new(), + }; + table_engine + .create_table(&ctx, req) + .await + .expect("create table must succeed"); + // test renaming a table with an existing name. + let req = AlterTableRequest { + catalog_name: None, + schema_name: None, + table_name: TABLE_NAME.to_string(), + alter_kind: AlterKind::RenameTable { + new_table_name: another_name.to_string(), + }, + }; + let ret = table_engine.alter_table(&ctx, req).await; + assert!(ret.is_err()); + assert!(matches!(ret, Err(e) if format!("{e:?}").contains("Table already exists"))); - let new_table_name = "table_t"; + let new_table_name = "test_table"; // test rename table let req = AlterTableRequest { catalog_name: None, @@ -1090,7 +1137,6 @@ mod tests { new_table_name: new_table_name.to_string(), }, }; - let ctx = EngineContext::default(); let table = table_engine.alter_table(&ctx, req).await.unwrap(); assert_eq!(table.table_info().name, new_table_name); From 707159a98551cae553f0d9678322e4ab89a95b1f Mon Sep 17 00:00:00 2001 From: elijah Date: Tue, 10 Jan 2023 14:47:08 +0800 Subject: [PATCH 10/16] fix: improve the system catalog's renaming process --- src/catalog/src/local/manager.rs | 4 +- src/catalog/src/system.rs | 90 ++++++++++---------------------- src/catalog/src/tables.rs | 78 ++------------------------- 3 files changed, 33 insertions(+), 139 deletions(-) diff --git a/src/catalog/src/local/manager.rs b/src/catalog/src/local/manager.rs index 6682864e4d13..c53141fb4702 100644 --- a/src/catalog/src/local/manager.rs +++ b/src/catalog/src/local/manager.rs @@ -405,11 +405,11 @@ impl CatalogManager for LocalCatalogManager { schema: schema_name, })?; + // rename table in system catalog self.system - .rename_table( + .register_table( catalog_name.clone(), schema_name.clone(), - request.table_name.clone(), request.new_table_name.clone(), request.table_id, ) diff --git a/src/catalog/src/system.rs b/src/catalog/src/system.rs index 8a45ab8db439..8f043c83398e 100644 --- a/src/catalog/src/system.rs +++ b/src/catalog/src/system.rs @@ -32,7 +32,7 @@ use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; use table::engine::{EngineContext, TableEngineRef}; use table::metadata::{TableId, TableInfoRef}; -use table::requests::{CreateTableRequest, DeleteRequest, InsertRequest, OpenTableRequest}; +use table::requests::{CreateTableRequest, InsertRequest, OpenTableRequest}; use table::{Table, TableRef}; use crate::error::{ @@ -73,11 +73,6 @@ impl Table for SystemCatalogTable { self.table.insert(request).await } - /// Delete row from table - async fn delete(&self, request: DeleteRequest) -> table::Result { - self.table.delete(request).await - } - fn table_info(&self) -> TableInfoRef { self.table_info.clone() } @@ -173,17 +168,17 @@ fn build_system_catalog_schema() -> Schema { ColumnSchema::new( "value".to_string(), ConcreteDataType::binary_datatype(), - true, + false, ), ColumnSchema::new( "gmt_created".to_string(), ConcreteDataType::timestamp_millisecond_datatype(), - true, + false, ), ColumnSchema::new( "gmt_modified".to_string(), ConcreteDataType::timestamp_millisecond_datatype(), - true, + false, ), ]; @@ -191,11 +186,22 @@ fn build_system_catalog_schema() -> Schema { SchemaBuilder::try_from(cols).unwrap().build().unwrap() } -pub fn build_table_insert_request(full_table_name: String, table_id: TableId) -> InsertRequest { +/// Formats key string for table entry in system catalog +pub fn format_table_entry_key(catalog: &str, schema: &str, table_id: TableId) -> String { + format!("{catalog}.{schema}.{table_id}") +} + +pub fn build_table_insert_request( + catalog: String, + schema: String, + table_name: String, + table_id: TableId, +) -> InsertRequest { + let entry_key = format_table_entry_key(&catalog, &schema, table_id); build_insert_request( EntryType::Table, - full_table_name.as_bytes(), - serde_json::to_string(&TableEntryValue { table_id }) + entry_key.as_bytes(), + serde_json::to_string(&TableEntryValue { table_name }) .unwrap() .as_bytes(), ) @@ -212,27 +218,6 @@ pub fn build_schema_insert_request(catalog_name: String, schema_name: String) -> ) } -pub fn build_table_delete_request(full_table_name: String) -> DeleteRequest { - build_delete_request(EntryType::Table, full_table_name.as_bytes()) -} - -pub fn build_delete_request(entry_type: EntryType, key: &[u8]) -> DeleteRequest { - let mut key_column_values = HashMap::with_capacity(3); - key_column_values.insert( - "entry_type".to_string(), - Arc::new(UInt8Vector::from_slice(&[entry_type as u8])) as _, - ); - key_column_values.insert( - "key".to_string(), - Arc::new(BinaryVector::from_slice(&[key])) as _, - ); - key_column_values.insert( - "timestamp".to_string(), - Arc::new(TimestampMillisecondVector::from_slice(&[0])) as _, - ); - DeleteRequest { key_column_values } -} - pub fn build_insert_request(entry_type: EntryType, key: &[u8], value: &[u8]) -> InsertRequest { let mut columns_values = HashMap::with_capacity(6); columns_values.insert( @@ -311,8 +296,8 @@ pub fn decode_system_catalog( } EntryType::Table => { - // As for table entry, the key is a string with format: `..` - // and the value is a JSON string with format: `{"table_id": }` + // As for table entry, the key is a string with format: `..` + // and the value is a JSON string with format: `{"table_name": }` let table_parts = key.split('.').collect::>(); ensure!( table_parts.len() >= 3, @@ -324,11 +309,12 @@ pub fn decode_system_catalog( debug!("Table meta value: {}", String::from_utf8_lossy(value)); let table_meta: TableEntryValue = serde_json::from_slice(value).context(ValueDeserializeSnafu)?; + let table_id = table_parts[2].parse::().unwrap(); Ok(Entry::Table(TableEntry { catalog_name: table_parts[0].to_string(), schema_name: table_parts[1].to_string(), - table_name: table_parts[2].to_string(), - table_id: table_meta.table_id, + table_name: table_meta.table_name, + table_id, })) } } @@ -388,7 +374,7 @@ pub struct TableEntry { #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] pub struct TableEntryValue { - pub table_id: TableId, + pub table_name: String, } #[cfg(test)] @@ -404,8 +390,6 @@ mod tests { use tempdir::TempDir; use super::*; - use crate::error::DeleteCatalogRecordSnafu; - use crate::format_full_table_name; #[test] pub fn test_decode_catalog_entry() { @@ -443,8 +427,8 @@ mod tests { pub fn test_decode_table() { let entry = decode_system_catalog( Some(EntryType::Table as u8), - Some("some_catalog.some_schema.some_table".as_bytes()), - Some("{\"table_id\":42}".as_bytes()), + Some("some_catalog.some_schema.42".as_bytes()), + Some("{\"table_name\":\"some_table\"}".as_bytes()), ) .unwrap(); @@ -463,7 +447,7 @@ mod tests { pub fn test_decode_mismatch() { decode_system_catalog( Some(EntryType::Table as u8), - Some("some_catalog.some_schema.some_table".as_bytes()), + Some("some_catalog.some_schema.42".as_bytes()), None, ) .unwrap(); @@ -515,24 +499,4 @@ mod tests { assert_eq!(SYSTEM_CATALOG_NAME, info.catalog_name); assert_eq!(INFORMATION_SCHEMA_NAME, info.schema_name); } - - #[tokio::test] - async fn test_system_table_delete() { - let (_dir, table_engine) = prepare_table_engine().await; - let system_table = SystemCatalogTable::new(table_engine).await.unwrap(); - let table_name = "test_table"; - let table_id = 42; - let full_table_name = - format_full_table_name(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name); - let insert_req = build_table_insert_request(full_table_name.clone(), table_id); - - assert!(system_table.insert(insert_req).await.is_ok()); - - let delete_req = build_table_delete_request(full_table_name); - let ret = system_table - .delete(delete_req) - .await - .context(DeleteCatalogRecordSnafu); - assert!(ret.is_ok()); - } } diff --git a/src/catalog/src/tables.rs b/src/catalog/src/tables.rs index 9e64e51e7c42..9a02af90383d 100644 --- a/src/catalog/src/tables.rs +++ b/src/catalog/src/tables.rs @@ -38,14 +38,9 @@ use table::metadata::{TableId, TableInfoRef}; use table::table::scan::SimpleTableScan; use table::{Table, TableRef}; -use crate::error::{DeleteCatalogRecordSnafu, Error, InsertCatalogRecordSnafu}; -use crate::system::{ - build_schema_insert_request, build_table_delete_request, build_table_insert_request, - SystemCatalogTable, -}; -use crate::{ - format_full_table_name, CatalogListRef, CatalogProvider, SchemaProvider, SchemaProviderRef, -}; +use crate::error::{Error, InsertCatalogRecordSnafu}; +use crate::system::{build_schema_insert_request, build_table_insert_request, SystemCatalogTable}; +use crate::{CatalogListRef, CatalogProvider, SchemaProvider, SchemaProviderRef}; /// Tables holds all tables created by user. pub struct Tables { @@ -281,8 +276,7 @@ impl SystemCatalog { table_name: String, table_id: TableId, ) -> crate::error::Result { - let full_table_name = format_full_table_name(&catalog, &schema, &table_name); - let request = build_table_insert_request(full_table_name, table_id); + let request = build_table_insert_request(catalog, schema, table_name, table_id); self.information_schema .system .insert(request) @@ -290,26 +284,6 @@ impl SystemCatalog { .context(InsertCatalogRecordSnafu) } - pub async fn rename_table( - &self, - catalog: String, - schema: String, - table_name: String, - new_table_name: String, - table_id: TableId, - ) -> crate::error::Result { - let full_table_name = format_full_table_name(&catalog, &schema, &table_name); - let delete_table_req = build_table_delete_request(full_table_name); - self.information_schema - .system - .delete(delete_table_req) - .await - .context(DeleteCatalogRecordSnafu)?; - - self.register_table(catalog, schema, new_table_name, table_id) - .await - } - pub async fn register_schema( &self, catalog: String, @@ -381,8 +355,6 @@ mod tests { use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_query::physical_plan::SessionContext; use futures_util::StreamExt; - use mito::config::EngineConfig; - use mito::table::test_util::{new_test_object_store, MockEngine, MockMitoEngine}; use table::table::numbers::NumbersTable; use super::*; @@ -451,46 +423,4 @@ mod tests { panic!("Record batch should not be empty!") } } - - async fn create_system_catalog() -> Result { - let (_dir, object_store) = new_test_object_store("setup_mock_engine_and_table").await; - let mock_engine = Arc::new(MockMitoEngine::new( - EngineConfig::default(), - MockEngine::default(), - object_store, - )); - let table = SystemCatalogTable::new(mock_engine.clone()).await?; - let memory_catalog_list = new_memory_catalog_list().unwrap(); - Ok(SystemCatalog::new(table, memory_catalog_list, mock_engine)) - } - - #[tokio::test] - async fn test_rename_table() { - let system_catalog = create_system_catalog().await.unwrap(); - let catalog = DEFAULT_CATALOG_NAME.to_string(); - let schema = DEFAULT_SCHEMA_NAME.to_string(); - let table_name = "test_table"; - let table_id = 42; - assert!(system_catalog - .register_table( - catalog.clone(), - schema.clone(), - table_name.to_string(), - table_id - ) - .await - .is_ok()); - - let new_table_name = "demo"; - let ret = system_catalog - .rename_table( - catalog.clone(), - schema.clone(), - table_name.to_string(), - new_table_name.to_string(), - table_id, - ) - .await; - assert!(ret.is_ok()); - } } From e25d12febc3414f9f4542cf83d179481eb5b826e Mon Sep 17 00:00:00 2001 From: elijah Date: Wed, 11 Jan 2023 12:55:20 +0800 Subject: [PATCH 11/16] chore: improve the code --- src/catalog/src/error.rs | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/src/catalog/src/error.rs b/src/catalog/src/error.rs index 89c2fa379a6c..a2b1657642bd 100644 --- a/src/catalog/src/error.rs +++ b/src/catalog/src/error.rs @@ -142,15 +142,6 @@ pub enum Error { source: table::error::Error, }, - #[snafu(display( - "Failed to delete table creation record to system catalog, source: {}", - source - ))] - DeleteCatalogRecord { - #[snafu(backtrace)] - source: table::error::Error, - }, - #[snafu(display("Illegal catalog manager state: {}", msg))] IllegalManagerState { backtrace: Backtrace, msg: String }, @@ -246,7 +237,6 @@ impl ErrorExt for Error { Error::OpenSystemCatalog { source, .. } | Error::CreateSystemCatalog { source, .. } | Error::InsertCatalogRecord { source, .. } - | Error::DeleteCatalogRecord { source, .. } | Error::OpenTable { source, .. } | Error::CreateTable { source, .. } => source.status_code(), Error::MetaSrv { source, .. } => source.status_code(), From 237cbfa3eabcf5445e8306909692b7b2f7e68526 Mon Sep 17 00:00:00 2001 From: elijah <30852919+e1ijah1@users.noreply.github.com> Date: Wed, 11 Jan 2023 14:49:55 +0800 Subject: [PATCH 12/16] chore: improve the comment Co-authored-by: Yingwen --- src/catalog/src/lib.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index b1dbda2e160c..0408c524a942 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -97,8 +97,7 @@ pub trait CatalogManager: CatalogList { /// schema registered. async fn register_schema(&self, request: RegisterSchemaRequest) -> Result; - /// Rename a table within given catalog/schema/table_name to catalog manager, - /// returns whether the table renamed. + /// Rename a table to [RenameTableRequest::new_table_name], returns whether the table is renamed. async fn rename_table(&self, request: RenameTableRequest) -> Result; /// Register a system table, should be called before starting the manager. From 886c606a9d44fd3f3ecef1268cf0a150af9db08c Mon Sep 17 00:00:00 2001 From: elijah Date: Wed, 11 Jan 2023 19:43:34 +0800 Subject: [PATCH 13/16] chore: improve the code --- src/catalog/src/lib.rs | 19 +---- src/catalog/src/local/manager.rs | 10 +-- src/catalog/src/local/memory.rs | 47 ++++------ src/catalog/src/remote/manager.rs | 20 ++--- src/catalog/src/schema.rs | 7 +- src/catalog/src/system.rs | 3 +- src/catalog/src/tables.rs | 9 +- src/catalog/tests/local_catalog_tests.rs | 4 +- src/datanode/src/sql.rs | 7 +- src/datanode/src/sql/alter.rs | 10 +-- src/datanode/src/tests/instance_test.rs | 95 +++++++-------------- src/datanode/src/tests/test_util.rs | 24 +++++- src/frontend/src/catalog.rs | 14 +-- src/mito/src/engine.rs | 13 +-- src/query/src/datafusion/catalog_adapter.rs | 7 +- src/table/src/requests.rs | 6 ++ 16 files changed, 115 insertions(+), 180 deletions(-) diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index 0408c524a942..60ea456a7997 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -98,7 +98,7 @@ pub trait CatalogManager: CatalogList { async fn register_schema(&self, request: RegisterSchemaRequest) -> Result; /// Rename a table to [RenameTableRequest::new_table_name], returns whether the table is renamed. - async fn rename_table(&self, request: RenameTableRequest) -> Result; + async fn rename_table(&self, request: RenameTableRequest, table_id: TableId) -> Result; /// Register a system table, should be called before starting the manager. async fn register_system_table(&self, request: RegisterSystemTableRequest) @@ -145,27 +145,12 @@ impl Debug for RegisterTableRequest { } } -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct RenameTableRequest { pub catalog: String, pub schema: String, pub table_name: String, pub new_table_name: String, - pub table_id: TableId, - pub table: TableRef, -} - -impl Debug for RenameTableRequest { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("RenameTableRequest") - .field("catalog", &self.catalog) - .field("schema", &self.schema) - .field("table_name", &self.table_name) - .field("new_table_name", &self.new_table_name) - .field("table_id", &self.table_id) - .field("table", &self.table.table_info()) - .finish() - } } #[derive(Clone)] diff --git a/src/catalog/src/local/manager.rs b/src/catalog/src/local/manager.rs index c53141fb4702..3234f8ffdaa1 100644 --- a/src/catalog/src/local/manager.rs +++ b/src/catalog/src/local/manager.rs @@ -380,7 +380,7 @@ impl CatalogManager for LocalCatalogManager { } } - async fn rename_table(&self, request: RenameTableRequest) -> Result { + async fn rename_table(&self, request: RenameTableRequest, table_id: TableId) -> Result { let started = self.init_lock.lock().await; ensure!( @@ -411,12 +411,12 @@ impl CatalogManager for LocalCatalogManager { catalog_name.clone(), schema_name.clone(), request.new_table_name.clone(), - request.table_id, + table_id, ) .await?; - schema - .rename_table(&request.table_name, request.new_table_name, request.table) - .map(|v| v.is_none()) + Ok(schema + .rename_table(&request.table_name, request.new_table_name) + .is_ok()) } async fn deregister_table(&self, _request: DeregisterTableRequest) -> Result { diff --git a/src/catalog/src/local/memory.rs b/src/catalog/src/local/memory.rs index 0627e42e5a46..e2eb72cf26a1 100644 --- a/src/catalog/src/local/memory.rs +++ b/src/catalog/src/local/memory.rs @@ -92,7 +92,7 @@ impl CatalogManager for MemoryCatalogManager { .map(|v| v.is_none()) } - async fn rename_table(&self, request: RenameTableRequest) -> Result { + async fn rename_table(&self, request: RenameTableRequest, _table_id: TableId) -> Result { let catalogs = self.catalogs.write().unwrap(); let catalog = catalogs .get(&request.catalog) @@ -106,9 +106,9 @@ impl CatalogManager for MemoryCatalogManager { catalog: &request.catalog, schema: &request.schema, })?; - schema - .rename_table(&request.table_name, request.new_table_name, request.table) - .map(|v| v.is_none()) + Ok(schema + .rename_table(&request.table_name, request.new_table_name) + .is_ok()) } async fn deregister_table(&self, request: DeregisterTableRequest) -> Result { @@ -312,25 +312,11 @@ impl SchemaProvider for MemorySchemaProvider { } } - fn rename_table( - &self, - name: &str, - new_name: String, - table: TableRef, - ) -> Result> { + fn rename_table(&self, name: &str, new_name: String) -> Result { let mut tables = self.tables.write().unwrap(); - if let Some(existing) = tables.get(name) { - // if table with the same name but different table id exists, then it's a fatal bug - if existing.table_info().ident.table_id != table.table_info().ident.table_id { - error!( - "Unexpected table rename: {:?}, existing: {:?}", - table.table_info(), - existing.table_info() - ); - return TableExistsSnafu { table: name }.fail()?; - } - tables.remove(name); - Ok(tables.insert(new_name, table)) + if tables.get(name).is_some() { + let table = tables.remove(name).unwrap(); + Ok(tables.insert(new_name, table).unwrap()) } else { TableNotFoundSnafu { table_info: name.to_string(), @@ -418,10 +404,9 @@ mod tests { // rename test table let new_table_name = "numbers"; - assert!(provider - .rename_table(table_name, new_table_name.to_string(), test_table.clone(),) - .unwrap() - .is_none()); + provider + .rename_table(table_name, new_table_name.to_string()) + .unwrap(); // test old table name not exist assert!(!provider.table_exist(table_name).unwrap()); @@ -438,7 +423,6 @@ mod tests { let other_table = Arc::new(NumbersTable::new(2)); let result = provider.register_table(new_table_name.to_string(), other_table); let err = result.err().unwrap(); - assert!(err.backtrace_opt().is_some()); assert_eq!(StatusCode::TableAlreadyExists, err.status_code()); } @@ -459,7 +443,7 @@ mod tests { schema: DEFAULT_SCHEMA_NAME.to_string(), table_name: table_name.to_string(), table_id, - table: table.clone(), + table, }; assert!(catalog.register_table(register_table_req).await.unwrap()); assert!(schema.table_exist(table_name).unwrap()); @@ -471,10 +455,11 @@ mod tests { schema: DEFAULT_SCHEMA_NAME.to_string(), table_name: table_name.to_string(), new_table_name: new_table_name.to_string(), - table_id, - table: table.clone(), }; - assert!(catalog.rename_table(rename_table_req).await.unwrap()); + assert!(catalog + .rename_table(rename_table_req, table_id) + .await + .unwrap()); assert!(!schema.table_exist(table_name).unwrap()); assert!(schema.table_exist(new_table_name).unwrap()); diff --git a/src/catalog/src/remote/manager.rs b/src/catalog/src/remote/manager.rs index ab247b6c7255..e57fa95ef9a6 100644 --- a/src/catalog/src/remote/manager.rs +++ b/src/catalog/src/remote/manager.rs @@ -449,9 +449,11 @@ impl CatalogManager for RemoteCatalogManager { Ok(true) } - async fn rename_table(&self, _request: RenameTableRequest) -> Result { - // todo impl rename_table for catalog manager - todo!() + async fn rename_table(&self, _request: RenameTableRequest, _table_id: TableId) -> Result { + UnimplementedSnafu { + operation: "rename table", + } + .fail() } async fn register_system_table(&self, request: RegisterSystemTableRequest) -> Result<()> { @@ -744,13 +746,11 @@ impl SchemaProvider for RemoteSchemaProvider { prev } - fn rename_table( - &self, - _name: &str, - _new_name: String, - _table: TableRef, - ) -> Result> { - todo!() + fn rename_table(&self, _name: &str, _new_name: String) -> Result { + UnimplementedSnafu { + operation: "rename table", + } + .fail() } fn deregister_table(&self, name: &str) -> Result> { diff --git a/src/catalog/src/schema.rs b/src/catalog/src/schema.rs index 6d24d64a5a83..0dc9ddab88fe 100644 --- a/src/catalog/src/schema.rs +++ b/src/catalog/src/schema.rs @@ -37,12 +37,7 @@ pub trait SchemaProvider: Sync + Send { /// If supported by the implementation, renames an existing table from this schema and returns it. /// If no table of that name exists, returns "Table not found" error. - fn rename_table( - &self, - name: &str, - new_name: String, - table: TableRef, - ) -> Result>; + fn rename_table(&self, name: &str, new_name: String) -> Result; /// If supported by the implementation, removes an existing table from this schema and returns it. /// If no table of that name exists, returns Ok(None). diff --git a/src/catalog/src/system.rs b/src/catalog/src/system.rs index 8f043c83398e..c163979cb3bc 100644 --- a/src/catalog/src/system.rs +++ b/src/catalog/src/system.rs @@ -187,6 +187,7 @@ fn build_system_catalog_schema() -> Schema { } /// Formats key string for table entry in system catalog +#[inline] pub fn format_table_entry_key(catalog: &str, schema: &str, table_id: TableId) -> String { format!("{catalog}.{schema}.{table_id}") } @@ -309,7 +310,7 @@ pub fn decode_system_catalog( debug!("Table meta value: {}", String::from_utf8_lossy(value)); let table_meta: TableEntryValue = serde_json::from_slice(value).context(ValueDeserializeSnafu)?; - let table_id = table_parts[2].parse::().unwrap(); + let table_id = table_parts[2].parse::().unwrap(); Ok(Entry::Table(TableEntry { catalog_name: table_parts[0].to_string(), schema_name: table_parts[1].to_string(), diff --git a/src/catalog/src/tables.rs b/src/catalog/src/tables.rs index 9a02af90383d..971b7524900c 100644 --- a/src/catalog/src/tables.rs +++ b/src/catalog/src/tables.rs @@ -231,13 +231,8 @@ impl SchemaProvider for InformationSchema { panic!("System catalog & schema does not support register table") } - fn rename_table( - &self, - _name: &str, - _new_name: String, - _table: TableRef, - ) -> crate::error::Result> { - panic!("System catalog & schema does not support rename table") + fn rename_table(&self, _name: &str, _new_name: String) -> crate::error::Result { + unimplemented!("System catalog & schema does not support rename table") } fn deregister_table(&self, _name: &str) -> crate::error::Result> { diff --git a/src/catalog/tests/local_catalog_tests.rs b/src/catalog/tests/local_catalog_tests.rs index 8c7f31c00570..840e10e8950e 100644 --- a/src/catalog/tests/local_catalog_tests.rs +++ b/src/catalog/tests/local_catalog_tests.rs @@ -62,11 +62,9 @@ mod tests { schema: DEFAULT_SCHEMA_NAME.to_string(), table_name: table_name.to_string(), new_table_name: new_table_name.to_string(), - table_id, - table: table.clone(), }; assert!(catalog_manager - .rename_table(rename_table_req) + .rename_table(rename_table_req, table_id) .await .unwrap()); diff --git a/src/datanode/src/sql.rs b/src/datanode/src/sql.rs index 760ad4c59da7..e885bcdab08d 100644 --- a/src/datanode/src/sql.rs +++ b/src/datanode/src/sql.rs @@ -209,12 +209,7 @@ mod tests { unimplemented!(); } - fn rename_table( - &self, - _name: &str, - _new_name: String, - _table: TableRef, - ) -> catalog::error::Result> { + fn rename_table(&self, _name: &str, _new_name: String) -> catalog::error::Result { unimplemented!() } diff --git a/src/datanode/src/sql/alter.rs b/src/datanode/src/sql/alter.rs index 5d5f321fe3d1..d7724ca7b99f 100644 --- a/src/datanode/src/sql/alter.rs +++ b/src/datanode/src/sql/alter.rs @@ -44,7 +44,7 @@ impl SqlHandler { table_name: &full_table_name, } ); - let kind = req.alter_kind.clone(); + let is_rename = req.is_rename_table(); let table = self.table_engine .alter_table(&ctx, req) @@ -52,18 +52,16 @@ impl SqlHandler { .context(error::AlterTableSnafu { table_name: full_table_name, })?; - let table_info = &table.table_info(); - if let AlterKind::RenameTable { .. } = kind { + if is_rename { + let table_info = &table.table_info(); let rename_table_req = RenameTableRequest { catalog: table_info.catalog_name.clone(), schema: table_info.schema_name.clone(), table_name, new_table_name: table_info.name.clone(), - table_id: table_info.ident.table_id, - table, }; self.catalog_manager - .rename_table(rename_table_req) + .rename_table(rename_table_req, table_info.ident.table_id) .await .context(error::RenameTableSnafu)?; } diff --git a/src/datanode/src/tests/instance_test.rs b/src/datanode/src/tests/instance_test.rs index 1fbc5723b3e5..4ee4c3a6278d 100644 --- a/src/datanode/src/tests/instance_test.rs +++ b/src/datanode/src/tests/instance_test.rs @@ -342,6 +342,31 @@ pub async fn test_execute_create() { assert!(matches!(output, Output::AffectedRows(0))); } +async fn check_show_tables_and_select(instance: &MockInstance) { + let output = execute_sql_in_db(instance, "show tables", "db").await; + let expect = "\ ++------------+ +| Tables | ++------------+ +| test_table | ++------------+\ +" + .to_string(); + check_output_stream(output, expect).await; + + let output = execute_sql_in_db(instance, "select * from test_table order by ts", "db").await; + let expected = "\ ++-------+-----+--------+---------------------+ +| host | cpu | memory | ts | ++-------+-----+--------+---------------------+ +| host1 | 1.1 | 100 | 1970-01-01T00:00:01 | +| host2 | 2.2 | 200 | 1970-01-01T00:00:02 | ++-------+-----+--------+---------------------+\ +" + .to_string(); + check_output_stream(output, expected).await; +} + #[tokio::test] async fn test_rename_table() { let instance = MockInstance::new("test_rename_table_local").await; @@ -357,21 +382,10 @@ async fn test_rename_table() { .await; assert!(matches!(output, Output::AffectedRows(0))); - let output = execute_sql_in_db(&instance, "show tables", "db").await; - let expect = "\ -+--------+ -| Tables | -+--------+ -| demo | -+--------+\ -" - .to_string(); - check_output_stream(output, expect).await; - // make sure table insertion is ok before altering table name let output = execute_sql_in_db( &instance, - "insert into demo(host, cpu, memory, ts) values ('host1', 1.1, 100, 1000)", + "insert into demo(host, cpu, memory, ts) values ('host1', 1.1, 100, 1000), ('host2', 2.2, 200, 2000)", "db", ) .await; @@ -381,63 +395,12 @@ async fn test_rename_table() { let output = execute_sql_in_db(&instance, "alter table demo rename test_table", "db").await; assert!(matches!(output, Output::AffectedRows(0))); - let output = execute_sql_in_db(&instance, "show tables", "db").await; - let expect = "\ -+------------+ -| Tables | -+------------+ -| test_table | -+------------+\ -" - .to_string(); - check_output_stream(output, expect).await; - - // make sure table insertion is ok after altered table name - let output = execute_sql_in_db( - &instance, - "insert into test_table(host, cpu, memory, ts) values ('host2', 2.2, 200, 2000)", - "db", - ) - .await; - assert!(matches!(output, Output::AffectedRows(1))); - - let output = execute_sql_in_db(&instance, "select * from test_table order by ts", "db").await; - let expected = "\ -+-------+-----+--------+---------------------+ -| host | cpu | memory | ts | -+-------+-----+--------+---------------------+ -| host1 | 1.1 | 100 | 1970-01-01T00:00:01 | -| host2 | 2.2 | 200 | 1970-01-01T00:00:02 | -+-------+-----+--------+---------------------+\ -" - .to_string(); - check_output_stream(output, expected).await; + check_show_tables_and_select(&instance).await; // restart instance - assert!(instance.restart().await.is_ok()); + let instance = MockInstance::create_by_options(instance.options()).await; - let output = execute_sql_in_db(&instance, "show tables", "db").await; - let expect = "\ -+------------+ -| Tables | -+------------+ -| test_table | -+------------+\ -" - .to_string(); - check_output_stream(output, expect).await; - - let output = execute_sql_in_db(&instance, "select * from test_table order by ts", "db").await; - let expected = "\ -+-------+-----+--------+---------------------+ -| host | cpu | memory | ts | -+-------+-----+--------+---------------------+ -| host1 | 1.1 | 100 | 1970-01-01T00:00:01 | -| host2 | 2.2 | 200 | 1970-01-01T00:00:02 | -+-------+-----+--------+---------------------+\ -" - .to_string(); - check_output_stream(output, expected).await; + check_show_tables_and_select(&instance).await; } #[tokio::test(flavor = "multi_thread")] diff --git a/src/datanode/src/tests/test_util.rs b/src/datanode/src/tests/test_util.rs index 751c6d0cbaaa..419d37b3eacf 100644 --- a/src/datanode/src/tests/test_util.rs +++ b/src/datanode/src/tests/test_util.rs @@ -36,7 +36,8 @@ use crate::sql::SqlHandler; pub(crate) struct MockInstance { instance: Instance, - _guard: TestGuard, + opts: DatanodeOptions, + _guard: Option, } impl MockInstance { @@ -46,15 +47,30 @@ impl MockInstance { let instance = Instance::with_mock_meta_client(&opts).await.unwrap(); instance.start().await.unwrap(); - MockInstance { instance, _guard } + MockInstance { + instance, + opts, + _guard: Some(_guard), + } + } + + pub(crate) async fn create_by_options(opts: DatanodeOptions) -> Self { + let instance = Instance::with_mock_meta_client(&opts).await.unwrap(); + instance.start().await.unwrap(); + + MockInstance { + instance, + opts, + _guard: None, + } } pub(crate) fn inner(&self) -> &Instance { &self.instance } - pub(crate) async fn restart(&self) -> Result<()> { - self.instance.start().await + pub(crate) fn options(&self) -> DatanodeOptions { + self.opts.clone() } } diff --git a/src/frontend/src/catalog.rs b/src/frontend/src/catalog.rs index 3681a36f57ac..ade0076deec4 100644 --- a/src/frontend/src/catalog.rs +++ b/src/frontend/src/catalog.rs @@ -30,6 +30,7 @@ use catalog::{ use futures::StreamExt; use meta_client::rpc::TableName; use snafu::prelude::*; +use table::metadata::TableId; use table::TableRef; use crate::datanode::DatanodeClients; @@ -97,7 +98,11 @@ impl CatalogManager for FrontendCatalogManager { unimplemented!() } - async fn rename_table(&self, _request: RenameTableRequest) -> catalog_err::Result { + async fn rename_table( + &self, + _request: RenameTableRequest, + _table_id: TableId, + ) -> catalog_err::Result { unimplemented!() } @@ -326,12 +331,7 @@ impl SchemaProvider for FrontendSchemaProvider { unimplemented!("Frontend schema provider does not support register table") } - fn rename_table( - &self, - _name: &str, - _new_name: String, - _table: TableRef, - ) -> catalog_err::Result> { + fn rename_table(&self, _name: &str, _new_name: String) -> catalog_err::Result { unimplemented!("Frontend schema provider does not support rename table") } diff --git a/src/mito/src/engine.rs b/src/mito/src/engine.rs index 52b64704a275..b837f168808d 100644 --- a/src/mito/src/engine.rs +++ b/src/mito/src/engine.rs @@ -525,9 +525,10 @@ impl MitoEngineInner { if let AlterKind::RenameTable { new_table_name } = &req.alter_kind { table_ref.table = new_table_name.as_str(); + let full_table_name = table_ref.to_string(); let mut tables = self.tables.write().unwrap(); - tables.remove(&table_ref.to_string()); - tables.insert(table_ref.to_string(), table.clone()); + tables.remove(&full_table_name); + tables.insert(full_table_name, table.clone()); } Ok(table) } @@ -1123,9 +1124,11 @@ mod tests { new_table_name: another_name.to_string(), }, }; - let ret = table_engine.alter_table(&ctx, req).await; - assert!(ret.is_err()); - assert!(matches!(ret, Err(e) if format!("{e:?}").contains("Table already exists"))); + let err = table_engine.alter_table(&ctx, req).await.err().unwrap(); + assert!( + err.to_string().contains("Table already exists"), + "Unexpected error: {err}" + ); let new_table_name = "test_table"; // test rename table diff --git a/src/query/src/datafusion/catalog_adapter.rs b/src/query/src/datafusion/catalog_adapter.rs index 3a176154a566..d5bfd7ad62c6 100644 --- a/src/query/src/datafusion/catalog_adapter.rs +++ b/src/query/src/datafusion/catalog_adapter.rs @@ -231,12 +231,7 @@ impl SchemaProvider for SchemaProviderAdapter { .map(|_| table)) } - fn rename_table( - &self, - _name: &str, - _new_name: String, - _table: TableRef, - ) -> catalog_error::Result> { + fn rename_table(&self, _name: &str, _new_name: String) -> catalog_error::Result { todo!() } diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index 780d26788f80..4d6a6b1b8746 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -70,6 +70,12 @@ pub struct AlterTableRequest { pub alter_kind: AlterKind, } +impl AlterTableRequest { + pub fn is_rename_table(&self) -> bool { + matches!(self.alter_kind, AlterKind::RenameTable { .. }) + } +} + /// Add column request #[derive(Debug, Clone)] pub struct AddColumnRequest { From 731d65225539aa2c9428bf992dfda1d1473ea177 Mon Sep 17 00:00:00 2001 From: elijah Date: Thu, 12 Jan 2023 01:26:44 +0800 Subject: [PATCH 14/16] chore: fix tests --- src/catalog/src/local/memory.rs | 3 ++- src/datanode/src/tests/instance_test.rs | 7 +++---- src/datanode/src/tests/test_util.rs | 27 +++++++++---------------- 3 files changed, 14 insertions(+), 23 deletions(-) diff --git a/src/catalog/src/local/memory.rs b/src/catalog/src/local/memory.rs index e2eb72cf26a1..c885126cf819 100644 --- a/src/catalog/src/local/memory.rs +++ b/src/catalog/src/local/memory.rs @@ -316,7 +316,8 @@ impl SchemaProvider for MemorySchemaProvider { let mut tables = self.tables.write().unwrap(); if tables.get(name).is_some() { let table = tables.remove(name).unwrap(); - Ok(tables.insert(new_name, table).unwrap()) + tables.insert(new_name, table.clone()); + Ok(table) } else { TableNotFoundSnafu { table_info: name.to_string(), diff --git a/src/datanode/src/tests/instance_test.rs b/src/datanode/src/tests/instance_test.rs index 4ee4c3a6278d..5ba11bf7dc5f 100644 --- a/src/datanode/src/tests/instance_test.rs +++ b/src/datanode/src/tests/instance_test.rs @@ -369,7 +369,7 @@ async fn check_show_tables_and_select(instance: &MockInstance) { #[tokio::test] async fn test_rename_table() { - let instance = MockInstance::new("test_rename_table_local").await; + let mut instance = MockInstance::new("test_rename_table_local").await; let output = execute_sql(&instance, "create database db").await; assert!(matches!(output, Output::AffectedRows(1))); @@ -389,7 +389,7 @@ async fn test_rename_table() { "db", ) .await; - assert!(matches!(output, Output::AffectedRows(1))); + assert!(matches!(output, Output::AffectedRows(2))); // rename table let output = execute_sql_in_db(&instance, "alter table demo rename test_table", "db").await; @@ -398,8 +398,7 @@ async fn test_rename_table() { check_show_tables_and_select(&instance).await; // restart instance - let instance = MockInstance::create_by_options(instance.options()).await; - + instance.restart().await; check_show_tables_and_select(&instance).await; } diff --git a/src/datanode/src/tests/test_util.rs b/src/datanode/src/tests/test_util.rs index 419d37b3eacf..1e82abc02dac 100644 --- a/src/datanode/src/tests/test_util.rs +++ b/src/datanode/src/tests/test_util.rs @@ -31,13 +31,13 @@ use tempdir::TempDir; use crate::datanode::{DatanodeOptions, ObjectStoreConfig, WalConfig}; use crate::error::{CreateTableSnafu, Result}; -use crate::instance::Instance; +use crate::instance::{Instance, InstanceRef}; use crate::sql::SqlHandler; pub(crate) struct MockInstance { - instance: Instance, + instance: InstanceRef, opts: DatanodeOptions, - _guard: Option, + _guard: TestGuard, } impl MockInstance { @@ -48,29 +48,20 @@ impl MockInstance { instance.start().await.unwrap(); MockInstance { - instance, + instance: Arc::new(instance), opts, - _guard: Some(_guard), + _guard, } } - pub(crate) async fn create_by_options(opts: DatanodeOptions) -> Self { - let instance = Instance::with_mock_meta_client(&opts).await.unwrap(); + pub(crate) async fn restart(&mut self) { + let instance = Instance::with_mock_meta_client(&self.opts).await.unwrap(); instance.start().await.unwrap(); - - MockInstance { - instance, - opts, - _guard: None, - } + self.instance = Arc::new(instance); } pub(crate) fn inner(&self) -> &Instance { - &self.instance - } - - pub(crate) fn options(&self) -> DatanodeOptions { - self.opts.clone() + self.instance.as_ref() } } From 0163e9320f0df7b5a9000a8f2e294ecac7fd08eb Mon Sep 17 00:00:00 2001 From: elijah Date: Thu, 12 Jan 2023 18:59:02 +0800 Subject: [PATCH 15/16] chore: fix instance_test --- src/datanode/src/tests/instance_test.rs | 52 +++++++++++-------------- src/datanode/src/tests/test_util.rs | 19 ++------- 2 files changed, 26 insertions(+), 45 deletions(-) diff --git a/src/datanode/src/tests/instance_test.rs b/src/datanode/src/tests/instance_test.rs index 5ba11bf7dc5f..89e7f3c4067d 100644 --- a/src/datanode/src/tests/instance_test.rs +++ b/src/datanode/src/tests/instance_test.rs @@ -342,34 +342,9 @@ pub async fn test_execute_create() { assert!(matches!(output, Output::AffectedRows(0))); } -async fn check_show_tables_and_select(instance: &MockInstance) { - let output = execute_sql_in_db(instance, "show tables", "db").await; - let expect = "\ -+------------+ -| Tables | -+------------+ -| test_table | -+------------+\ -" - .to_string(); - check_output_stream(output, expect).await; - - let output = execute_sql_in_db(instance, "select * from test_table order by ts", "db").await; - let expected = "\ -+-------+-----+--------+---------------------+ -| host | cpu | memory | ts | -+-------+-----+--------+---------------------+ -| host1 | 1.1 | 100 | 1970-01-01T00:00:01 | -| host2 | 2.2 | 200 | 1970-01-01T00:00:02 | -+-------+-----+--------+---------------------+\ -" - .to_string(); - check_output_stream(output, expected).await; -} - #[tokio::test] async fn test_rename_table() { - let mut instance = MockInstance::new("test_rename_table_local").await; + let instance = MockInstance::new("test_rename_table_local").await; let output = execute_sql(&instance, "create database db").await; assert!(matches!(output, Output::AffectedRows(1))); @@ -395,11 +370,28 @@ async fn test_rename_table() { let output = execute_sql_in_db(&instance, "alter table demo rename test_table", "db").await; assert!(matches!(output, Output::AffectedRows(0))); - check_show_tables_and_select(&instance).await; + let output = execute_sql_in_db(&instance, "show tables", "db").await; + let expect = "\ ++------------+ +| Tables | ++------------+ +| test_table | ++------------+\ +" + .to_string(); + check_output_stream(output, expect).await; - // restart instance - instance.restart().await; - check_show_tables_and_select(&instance).await; + let output = execute_sql_in_db(&instance, "select * from test_table order by ts", "db").await; + let expected = "\ ++-------+-----+--------+---------------------+ +| host | cpu | memory | ts | ++-------+-----+--------+---------------------+ +| host1 | 1.1 | 100 | 1970-01-01T00:00:01 | +| host2 | 2.2 | 200 | 1970-01-01T00:00:02 | ++-------+-----+--------+---------------------+\ +" + .to_string(); + check_output_stream(output, expected).await; } #[tokio::test(flavor = "multi_thread")] diff --git a/src/datanode/src/tests/test_util.rs b/src/datanode/src/tests/test_util.rs index 1e82abc02dac..d21603d0e0c3 100644 --- a/src/datanode/src/tests/test_util.rs +++ b/src/datanode/src/tests/test_util.rs @@ -31,12 +31,11 @@ use tempdir::TempDir; use crate::datanode::{DatanodeOptions, ObjectStoreConfig, WalConfig}; use crate::error::{CreateTableSnafu, Result}; -use crate::instance::{Instance, InstanceRef}; +use crate::instance::Instance; use crate::sql::SqlHandler; pub(crate) struct MockInstance { - instance: InstanceRef, - opts: DatanodeOptions, + instance: Instance, _guard: TestGuard, } @@ -47,21 +46,11 @@ impl MockInstance { let instance = Instance::with_mock_meta_client(&opts).await.unwrap(); instance.start().await.unwrap(); - MockInstance { - instance: Arc::new(instance), - opts, - _guard, - } - } - - pub(crate) async fn restart(&mut self) { - let instance = Instance::with_mock_meta_client(&self.opts).await.unwrap(); - instance.start().await.unwrap(); - self.instance = Arc::new(instance); + MockInstance { instance, _guard } } pub(crate) fn inner(&self) -> &Instance { - self.instance.as_ref() + &self.instance } } From b5487c9e9f17fc57c0495b1c88e8a1cfc0b98e49 Mon Sep 17 00:00:00 2001 From: elijah Date: Thu, 12 Jan 2023 19:25:10 +0800 Subject: [PATCH 16/16] chore: improve the code --- src/catalog/src/lib.rs | 3 ++- src/catalog/src/local/manager.rs | 4 ++-- src/catalog/src/local/memory.rs | 8 +++----- src/catalog/src/remote/manager.rs | 2 +- src/catalog/tests/local_catalog_tests.rs | 3 ++- src/datanode/src/sql/alter.rs | 3 ++- src/frontend/src/catalog.rs | 7 +------ 7 files changed, 13 insertions(+), 17 deletions(-) diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index 60ea456a7997..a387356db2c2 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -98,7 +98,7 @@ pub trait CatalogManager: CatalogList { async fn register_schema(&self, request: RegisterSchemaRequest) -> Result; /// Rename a table to [RenameTableRequest::new_table_name], returns whether the table is renamed. - async fn rename_table(&self, request: RenameTableRequest, table_id: TableId) -> Result; + async fn rename_table(&self, request: RenameTableRequest) -> Result; /// Register a system table, should be called before starting the manager. async fn register_system_table(&self, request: RegisterSystemTableRequest) @@ -151,6 +151,7 @@ pub struct RenameTableRequest { pub schema: String, pub table_name: String, pub new_table_name: String, + pub table_id: TableId, } #[derive(Clone)] diff --git a/src/catalog/src/local/manager.rs b/src/catalog/src/local/manager.rs index 3234f8ffdaa1..27178941ee0f 100644 --- a/src/catalog/src/local/manager.rs +++ b/src/catalog/src/local/manager.rs @@ -380,7 +380,7 @@ impl CatalogManager for LocalCatalogManager { } } - async fn rename_table(&self, request: RenameTableRequest, table_id: TableId) -> Result { + async fn rename_table(&self, request: RenameTableRequest) -> Result { let started = self.init_lock.lock().await; ensure!( @@ -411,7 +411,7 @@ impl CatalogManager for LocalCatalogManager { catalog_name.clone(), schema_name.clone(), request.new_table_name.clone(), - table_id, + request.table_id, ) .await?; Ok(schema diff --git a/src/catalog/src/local/memory.rs b/src/catalog/src/local/memory.rs index c885126cf819..285809321951 100644 --- a/src/catalog/src/local/memory.rs +++ b/src/catalog/src/local/memory.rs @@ -92,7 +92,7 @@ impl CatalogManager for MemoryCatalogManager { .map(|v| v.is_none()) } - async fn rename_table(&self, request: RenameTableRequest, _table_id: TableId) -> Result { + async fn rename_table(&self, request: RenameTableRequest) -> Result { let catalogs = self.catalogs.write().unwrap(); let catalog = catalogs .get(&request.catalog) @@ -456,11 +456,9 @@ mod tests { schema: DEFAULT_SCHEMA_NAME.to_string(), table_name: table_name.to_string(), new_table_name: new_table_name.to_string(), + table_id, }; - assert!(catalog - .rename_table(rename_table_req, table_id) - .await - .unwrap()); + assert!(catalog.rename_table(rename_table_req).await.unwrap()); assert!(!schema.table_exist(table_name).unwrap()); assert!(schema.table_exist(new_table_name).unwrap()); diff --git a/src/catalog/src/remote/manager.rs b/src/catalog/src/remote/manager.rs index e57fa95ef9a6..a930c29d320c 100644 --- a/src/catalog/src/remote/manager.rs +++ b/src/catalog/src/remote/manager.rs @@ -449,7 +449,7 @@ impl CatalogManager for RemoteCatalogManager { Ok(true) } - async fn rename_table(&self, _request: RenameTableRequest, _table_id: TableId) -> Result { + async fn rename_table(&self, _request: RenameTableRequest) -> Result { UnimplementedSnafu { operation: "rename table", } diff --git a/src/catalog/tests/local_catalog_tests.rs b/src/catalog/tests/local_catalog_tests.rs index 840e10e8950e..1a68ef96ab07 100644 --- a/src/catalog/tests/local_catalog_tests.rs +++ b/src/catalog/tests/local_catalog_tests.rs @@ -62,9 +62,10 @@ mod tests { schema: DEFAULT_SCHEMA_NAME.to_string(), table_name: table_name.to_string(), new_table_name: new_table_name.to_string(), + table_id, }; assert!(catalog_manager - .rename_table(rename_table_req, table_id) + .rename_table(rename_table_req) .await .unwrap()); diff --git a/src/datanode/src/sql/alter.rs b/src/datanode/src/sql/alter.rs index d7724ca7b99f..a68b0ccf3d3a 100644 --- a/src/datanode/src/sql/alter.rs +++ b/src/datanode/src/sql/alter.rs @@ -59,9 +59,10 @@ impl SqlHandler { schema: table_info.schema_name.clone(), table_name, new_table_name: table_info.name.clone(), + table_id: table_info.ident.table_id, }; self.catalog_manager - .rename_table(rename_table_req, table_info.ident.table_id) + .rename_table(rename_table_req) .await .context(error::RenameTableSnafu)?; } diff --git a/src/frontend/src/catalog.rs b/src/frontend/src/catalog.rs index ade0076deec4..b610fa24f9b0 100644 --- a/src/frontend/src/catalog.rs +++ b/src/frontend/src/catalog.rs @@ -30,7 +30,6 @@ use catalog::{ use futures::StreamExt; use meta_client::rpc::TableName; use snafu::prelude::*; -use table::metadata::TableId; use table::TableRef; use crate::datanode::DatanodeClients; @@ -98,11 +97,7 @@ impl CatalogManager for FrontendCatalogManager { unimplemented!() } - async fn rename_table( - &self, - _request: RenameTableRequest, - _table_id: TableId, - ) -> catalog_err::Result { + async fn rename_table(&self, _request: RenameTableRequest) -> catalog_err::Result { unimplemented!() }