From 886c606a9d44fd3f3ecef1268cf0a150af9db08c Mon Sep 17 00:00:00 2001 From: elijah Date: Wed, 11 Jan 2023 19:43:34 +0800 Subject: [PATCH] chore: improve the code --- src/catalog/src/lib.rs | 19 +---- src/catalog/src/local/manager.rs | 10 +-- src/catalog/src/local/memory.rs | 47 ++++------ src/catalog/src/remote/manager.rs | 20 ++--- src/catalog/src/schema.rs | 7 +- src/catalog/src/system.rs | 3 +- src/catalog/src/tables.rs | 9 +- src/catalog/tests/local_catalog_tests.rs | 4 +- src/datanode/src/sql.rs | 7 +- src/datanode/src/sql/alter.rs | 10 +-- src/datanode/src/tests/instance_test.rs | 95 +++++++-------------- src/datanode/src/tests/test_util.rs | 24 +++++- src/frontend/src/catalog.rs | 14 +-- src/mito/src/engine.rs | 13 +-- src/query/src/datafusion/catalog_adapter.rs | 7 +- src/table/src/requests.rs | 6 ++ 16 files changed, 115 insertions(+), 180 deletions(-) diff --git a/src/catalog/src/lib.rs b/src/catalog/src/lib.rs index 0408c524a942..60ea456a7997 100644 --- a/src/catalog/src/lib.rs +++ b/src/catalog/src/lib.rs @@ -98,7 +98,7 @@ pub trait CatalogManager: CatalogList { async fn register_schema(&self, request: RegisterSchemaRequest) -> Result; /// Rename a table to [RenameTableRequest::new_table_name], returns whether the table is renamed. - async fn rename_table(&self, request: RenameTableRequest) -> Result; + async fn rename_table(&self, request: RenameTableRequest, table_id: TableId) -> Result; /// Register a system table, should be called before starting the manager. async fn register_system_table(&self, request: RegisterSystemTableRequest) @@ -145,27 +145,12 @@ impl Debug for RegisterTableRequest { } } -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct RenameTableRequest { pub catalog: String, pub schema: String, pub table_name: String, pub new_table_name: String, - pub table_id: TableId, - pub table: TableRef, -} - -impl Debug for RenameTableRequest { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - f.debug_struct("RenameTableRequest") - .field("catalog", &self.catalog) - .field("schema", &self.schema) - .field("table_name", &self.table_name) - .field("new_table_name", &self.new_table_name) - .field("table_id", &self.table_id) - .field("table", &self.table.table_info()) - .finish() - } } #[derive(Clone)] diff --git a/src/catalog/src/local/manager.rs b/src/catalog/src/local/manager.rs index c53141fb4702..3234f8ffdaa1 100644 --- a/src/catalog/src/local/manager.rs +++ b/src/catalog/src/local/manager.rs @@ -380,7 +380,7 @@ impl CatalogManager for LocalCatalogManager { } } - async fn rename_table(&self, request: RenameTableRequest) -> Result { + async fn rename_table(&self, request: RenameTableRequest, table_id: TableId) -> Result { let started = self.init_lock.lock().await; ensure!( @@ -411,12 +411,12 @@ impl CatalogManager for LocalCatalogManager { catalog_name.clone(), schema_name.clone(), request.new_table_name.clone(), - request.table_id, + table_id, ) .await?; - schema - .rename_table(&request.table_name, request.new_table_name, request.table) - .map(|v| v.is_none()) + Ok(schema + .rename_table(&request.table_name, request.new_table_name) + .is_ok()) } async fn deregister_table(&self, _request: DeregisterTableRequest) -> Result { diff --git a/src/catalog/src/local/memory.rs b/src/catalog/src/local/memory.rs index 0627e42e5a46..e2eb72cf26a1 100644 --- a/src/catalog/src/local/memory.rs +++ b/src/catalog/src/local/memory.rs @@ -92,7 +92,7 @@ impl CatalogManager for MemoryCatalogManager { .map(|v| v.is_none()) } - async fn rename_table(&self, request: RenameTableRequest) -> Result { + async fn rename_table(&self, request: RenameTableRequest, _table_id: TableId) -> Result { let catalogs = self.catalogs.write().unwrap(); let catalog = catalogs .get(&request.catalog) @@ -106,9 +106,9 @@ impl CatalogManager for MemoryCatalogManager { catalog: &request.catalog, schema: &request.schema, })?; - schema - .rename_table(&request.table_name, request.new_table_name, request.table) - .map(|v| v.is_none()) + Ok(schema + .rename_table(&request.table_name, request.new_table_name) + .is_ok()) } async fn deregister_table(&self, request: DeregisterTableRequest) -> Result { @@ -312,25 +312,11 @@ impl SchemaProvider for MemorySchemaProvider { } } - fn rename_table( - &self, - name: &str, - new_name: String, - table: TableRef, - ) -> Result> { + fn rename_table(&self, name: &str, new_name: String) -> Result { let mut tables = self.tables.write().unwrap(); - if let Some(existing) = tables.get(name) { - // if table with the same name but different table id exists, then it's a fatal bug - if existing.table_info().ident.table_id != table.table_info().ident.table_id { - error!( - "Unexpected table rename: {:?}, existing: {:?}", - table.table_info(), - existing.table_info() - ); - return TableExistsSnafu { table: name }.fail()?; - } - tables.remove(name); - Ok(tables.insert(new_name, table)) + if tables.get(name).is_some() { + let table = tables.remove(name).unwrap(); + Ok(tables.insert(new_name, table).unwrap()) } else { TableNotFoundSnafu { table_info: name.to_string(), @@ -418,10 +404,9 @@ mod tests { // rename test table let new_table_name = "numbers"; - assert!(provider - .rename_table(table_name, new_table_name.to_string(), test_table.clone(),) - .unwrap() - .is_none()); + provider + .rename_table(table_name, new_table_name.to_string()) + .unwrap(); // test old table name not exist assert!(!provider.table_exist(table_name).unwrap()); @@ -438,7 +423,6 @@ mod tests { let other_table = Arc::new(NumbersTable::new(2)); let result = provider.register_table(new_table_name.to_string(), other_table); let err = result.err().unwrap(); - assert!(err.backtrace_opt().is_some()); assert_eq!(StatusCode::TableAlreadyExists, err.status_code()); } @@ -459,7 +443,7 @@ mod tests { schema: DEFAULT_SCHEMA_NAME.to_string(), table_name: table_name.to_string(), table_id, - table: table.clone(), + table, }; assert!(catalog.register_table(register_table_req).await.unwrap()); assert!(schema.table_exist(table_name).unwrap()); @@ -471,10 +455,11 @@ mod tests { schema: DEFAULT_SCHEMA_NAME.to_string(), table_name: table_name.to_string(), new_table_name: new_table_name.to_string(), - table_id, - table: table.clone(), }; - assert!(catalog.rename_table(rename_table_req).await.unwrap()); + assert!(catalog + .rename_table(rename_table_req, table_id) + .await + .unwrap()); assert!(!schema.table_exist(table_name).unwrap()); assert!(schema.table_exist(new_table_name).unwrap()); diff --git a/src/catalog/src/remote/manager.rs b/src/catalog/src/remote/manager.rs index ab247b6c7255..e57fa95ef9a6 100644 --- a/src/catalog/src/remote/manager.rs +++ b/src/catalog/src/remote/manager.rs @@ -449,9 +449,11 @@ impl CatalogManager for RemoteCatalogManager { Ok(true) } - async fn rename_table(&self, _request: RenameTableRequest) -> Result { - // todo impl rename_table for catalog manager - todo!() + async fn rename_table(&self, _request: RenameTableRequest, _table_id: TableId) -> Result { + UnimplementedSnafu { + operation: "rename table", + } + .fail() } async fn register_system_table(&self, request: RegisterSystemTableRequest) -> Result<()> { @@ -744,13 +746,11 @@ impl SchemaProvider for RemoteSchemaProvider { prev } - fn rename_table( - &self, - _name: &str, - _new_name: String, - _table: TableRef, - ) -> Result> { - todo!() + fn rename_table(&self, _name: &str, _new_name: String) -> Result { + UnimplementedSnafu { + operation: "rename table", + } + .fail() } fn deregister_table(&self, name: &str) -> Result> { diff --git a/src/catalog/src/schema.rs b/src/catalog/src/schema.rs index 6d24d64a5a83..0dc9ddab88fe 100644 --- a/src/catalog/src/schema.rs +++ b/src/catalog/src/schema.rs @@ -37,12 +37,7 @@ pub trait SchemaProvider: Sync + Send { /// If supported by the implementation, renames an existing table from this schema and returns it. /// If no table of that name exists, returns "Table not found" error. - fn rename_table( - &self, - name: &str, - new_name: String, - table: TableRef, - ) -> Result>; + fn rename_table(&self, name: &str, new_name: String) -> Result; /// If supported by the implementation, removes an existing table from this schema and returns it. /// If no table of that name exists, returns Ok(None). diff --git a/src/catalog/src/system.rs b/src/catalog/src/system.rs index 8f043c83398e..c163979cb3bc 100644 --- a/src/catalog/src/system.rs +++ b/src/catalog/src/system.rs @@ -187,6 +187,7 @@ fn build_system_catalog_schema() -> Schema { } /// Formats key string for table entry in system catalog +#[inline] pub fn format_table_entry_key(catalog: &str, schema: &str, table_id: TableId) -> String { format!("{catalog}.{schema}.{table_id}") } @@ -309,7 +310,7 @@ pub fn decode_system_catalog( debug!("Table meta value: {}", String::from_utf8_lossy(value)); let table_meta: TableEntryValue = serde_json::from_slice(value).context(ValueDeserializeSnafu)?; - let table_id = table_parts[2].parse::().unwrap(); + let table_id = table_parts[2].parse::().unwrap(); Ok(Entry::Table(TableEntry { catalog_name: table_parts[0].to_string(), schema_name: table_parts[1].to_string(), diff --git a/src/catalog/src/tables.rs b/src/catalog/src/tables.rs index 9a02af90383d..971b7524900c 100644 --- a/src/catalog/src/tables.rs +++ b/src/catalog/src/tables.rs @@ -231,13 +231,8 @@ impl SchemaProvider for InformationSchema { panic!("System catalog & schema does not support register table") } - fn rename_table( - &self, - _name: &str, - _new_name: String, - _table: TableRef, - ) -> crate::error::Result> { - panic!("System catalog & schema does not support rename table") + fn rename_table(&self, _name: &str, _new_name: String) -> crate::error::Result { + unimplemented!("System catalog & schema does not support rename table") } fn deregister_table(&self, _name: &str) -> crate::error::Result> { diff --git a/src/catalog/tests/local_catalog_tests.rs b/src/catalog/tests/local_catalog_tests.rs index 8c7f31c00570..840e10e8950e 100644 --- a/src/catalog/tests/local_catalog_tests.rs +++ b/src/catalog/tests/local_catalog_tests.rs @@ -62,11 +62,9 @@ mod tests { schema: DEFAULT_SCHEMA_NAME.to_string(), table_name: table_name.to_string(), new_table_name: new_table_name.to_string(), - table_id, - table: table.clone(), }; assert!(catalog_manager - .rename_table(rename_table_req) + .rename_table(rename_table_req, table_id) .await .unwrap()); diff --git a/src/datanode/src/sql.rs b/src/datanode/src/sql.rs index 760ad4c59da7..e885bcdab08d 100644 --- a/src/datanode/src/sql.rs +++ b/src/datanode/src/sql.rs @@ -209,12 +209,7 @@ mod tests { unimplemented!(); } - fn rename_table( - &self, - _name: &str, - _new_name: String, - _table: TableRef, - ) -> catalog::error::Result> { + fn rename_table(&self, _name: &str, _new_name: String) -> catalog::error::Result { unimplemented!() } diff --git a/src/datanode/src/sql/alter.rs b/src/datanode/src/sql/alter.rs index 5d5f321fe3d1..d7724ca7b99f 100644 --- a/src/datanode/src/sql/alter.rs +++ b/src/datanode/src/sql/alter.rs @@ -44,7 +44,7 @@ impl SqlHandler { table_name: &full_table_name, } ); - let kind = req.alter_kind.clone(); + let is_rename = req.is_rename_table(); let table = self.table_engine .alter_table(&ctx, req) @@ -52,18 +52,16 @@ impl SqlHandler { .context(error::AlterTableSnafu { table_name: full_table_name, })?; - let table_info = &table.table_info(); - if let AlterKind::RenameTable { .. } = kind { + if is_rename { + let table_info = &table.table_info(); let rename_table_req = RenameTableRequest { catalog: table_info.catalog_name.clone(), schema: table_info.schema_name.clone(), table_name, new_table_name: table_info.name.clone(), - table_id: table_info.ident.table_id, - table, }; self.catalog_manager - .rename_table(rename_table_req) + .rename_table(rename_table_req, table_info.ident.table_id) .await .context(error::RenameTableSnafu)?; } diff --git a/src/datanode/src/tests/instance_test.rs b/src/datanode/src/tests/instance_test.rs index 1fbc5723b3e5..4ee4c3a6278d 100644 --- a/src/datanode/src/tests/instance_test.rs +++ b/src/datanode/src/tests/instance_test.rs @@ -342,6 +342,31 @@ pub async fn test_execute_create() { assert!(matches!(output, Output::AffectedRows(0))); } +async fn check_show_tables_and_select(instance: &MockInstance) { + let output = execute_sql_in_db(instance, "show tables", "db").await; + let expect = "\ ++------------+ +| Tables | ++------------+ +| test_table | ++------------+\ +" + .to_string(); + check_output_stream(output, expect).await; + + let output = execute_sql_in_db(instance, "select * from test_table order by ts", "db").await; + let expected = "\ ++-------+-----+--------+---------------------+ +| host | cpu | memory | ts | ++-------+-----+--------+---------------------+ +| host1 | 1.1 | 100 | 1970-01-01T00:00:01 | +| host2 | 2.2 | 200 | 1970-01-01T00:00:02 | ++-------+-----+--------+---------------------+\ +" + .to_string(); + check_output_stream(output, expected).await; +} + #[tokio::test] async fn test_rename_table() { let instance = MockInstance::new("test_rename_table_local").await; @@ -357,21 +382,10 @@ async fn test_rename_table() { .await; assert!(matches!(output, Output::AffectedRows(0))); - let output = execute_sql_in_db(&instance, "show tables", "db").await; - let expect = "\ -+--------+ -| Tables | -+--------+ -| demo | -+--------+\ -" - .to_string(); - check_output_stream(output, expect).await; - // make sure table insertion is ok before altering table name let output = execute_sql_in_db( &instance, - "insert into demo(host, cpu, memory, ts) values ('host1', 1.1, 100, 1000)", + "insert into demo(host, cpu, memory, ts) values ('host1', 1.1, 100, 1000), ('host2', 2.2, 200, 2000)", "db", ) .await; @@ -381,63 +395,12 @@ async fn test_rename_table() { let output = execute_sql_in_db(&instance, "alter table demo rename test_table", "db").await; assert!(matches!(output, Output::AffectedRows(0))); - let output = execute_sql_in_db(&instance, "show tables", "db").await; - let expect = "\ -+------------+ -| Tables | -+------------+ -| test_table | -+------------+\ -" - .to_string(); - check_output_stream(output, expect).await; - - // make sure table insertion is ok after altered table name - let output = execute_sql_in_db( - &instance, - "insert into test_table(host, cpu, memory, ts) values ('host2', 2.2, 200, 2000)", - "db", - ) - .await; - assert!(matches!(output, Output::AffectedRows(1))); - - let output = execute_sql_in_db(&instance, "select * from test_table order by ts", "db").await; - let expected = "\ -+-------+-----+--------+---------------------+ -| host | cpu | memory | ts | -+-------+-----+--------+---------------------+ -| host1 | 1.1 | 100 | 1970-01-01T00:00:01 | -| host2 | 2.2 | 200 | 1970-01-01T00:00:02 | -+-------+-----+--------+---------------------+\ -" - .to_string(); - check_output_stream(output, expected).await; + check_show_tables_and_select(&instance).await; // restart instance - assert!(instance.restart().await.is_ok()); + let instance = MockInstance::create_by_options(instance.options()).await; - let output = execute_sql_in_db(&instance, "show tables", "db").await; - let expect = "\ -+------------+ -| Tables | -+------------+ -| test_table | -+------------+\ -" - .to_string(); - check_output_stream(output, expect).await; - - let output = execute_sql_in_db(&instance, "select * from test_table order by ts", "db").await; - let expected = "\ -+-------+-----+--------+---------------------+ -| host | cpu | memory | ts | -+-------+-----+--------+---------------------+ -| host1 | 1.1 | 100 | 1970-01-01T00:00:01 | -| host2 | 2.2 | 200 | 1970-01-01T00:00:02 | -+-------+-----+--------+---------------------+\ -" - .to_string(); - check_output_stream(output, expected).await; + check_show_tables_and_select(&instance).await; } #[tokio::test(flavor = "multi_thread")] diff --git a/src/datanode/src/tests/test_util.rs b/src/datanode/src/tests/test_util.rs index 751c6d0cbaaa..419d37b3eacf 100644 --- a/src/datanode/src/tests/test_util.rs +++ b/src/datanode/src/tests/test_util.rs @@ -36,7 +36,8 @@ use crate::sql::SqlHandler; pub(crate) struct MockInstance { instance: Instance, - _guard: TestGuard, + opts: DatanodeOptions, + _guard: Option, } impl MockInstance { @@ -46,15 +47,30 @@ impl MockInstance { let instance = Instance::with_mock_meta_client(&opts).await.unwrap(); instance.start().await.unwrap(); - MockInstance { instance, _guard } + MockInstance { + instance, + opts, + _guard: Some(_guard), + } + } + + pub(crate) async fn create_by_options(opts: DatanodeOptions) -> Self { + let instance = Instance::with_mock_meta_client(&opts).await.unwrap(); + instance.start().await.unwrap(); + + MockInstance { + instance, + opts, + _guard: None, + } } pub(crate) fn inner(&self) -> &Instance { &self.instance } - pub(crate) async fn restart(&self) -> Result<()> { - self.instance.start().await + pub(crate) fn options(&self) -> DatanodeOptions { + self.opts.clone() } } diff --git a/src/frontend/src/catalog.rs b/src/frontend/src/catalog.rs index 3681a36f57ac..ade0076deec4 100644 --- a/src/frontend/src/catalog.rs +++ b/src/frontend/src/catalog.rs @@ -30,6 +30,7 @@ use catalog::{ use futures::StreamExt; use meta_client::rpc::TableName; use snafu::prelude::*; +use table::metadata::TableId; use table::TableRef; use crate::datanode::DatanodeClients; @@ -97,7 +98,11 @@ impl CatalogManager for FrontendCatalogManager { unimplemented!() } - async fn rename_table(&self, _request: RenameTableRequest) -> catalog_err::Result { + async fn rename_table( + &self, + _request: RenameTableRequest, + _table_id: TableId, + ) -> catalog_err::Result { unimplemented!() } @@ -326,12 +331,7 @@ impl SchemaProvider for FrontendSchemaProvider { unimplemented!("Frontend schema provider does not support register table") } - fn rename_table( - &self, - _name: &str, - _new_name: String, - _table: TableRef, - ) -> catalog_err::Result> { + fn rename_table(&self, _name: &str, _new_name: String) -> catalog_err::Result { unimplemented!("Frontend schema provider does not support rename table") } diff --git a/src/mito/src/engine.rs b/src/mito/src/engine.rs index 52b64704a275..b837f168808d 100644 --- a/src/mito/src/engine.rs +++ b/src/mito/src/engine.rs @@ -525,9 +525,10 @@ impl MitoEngineInner { if let AlterKind::RenameTable { new_table_name } = &req.alter_kind { table_ref.table = new_table_name.as_str(); + let full_table_name = table_ref.to_string(); let mut tables = self.tables.write().unwrap(); - tables.remove(&table_ref.to_string()); - tables.insert(table_ref.to_string(), table.clone()); + tables.remove(&full_table_name); + tables.insert(full_table_name, table.clone()); } Ok(table) } @@ -1123,9 +1124,11 @@ mod tests { new_table_name: another_name.to_string(), }, }; - let ret = table_engine.alter_table(&ctx, req).await; - assert!(ret.is_err()); - assert!(matches!(ret, Err(e) if format!("{e:?}").contains("Table already exists"))); + let err = table_engine.alter_table(&ctx, req).await.err().unwrap(); + assert!( + err.to_string().contains("Table already exists"), + "Unexpected error: {err}" + ); let new_table_name = "test_table"; // test rename table diff --git a/src/query/src/datafusion/catalog_adapter.rs b/src/query/src/datafusion/catalog_adapter.rs index 3a176154a566..d5bfd7ad62c6 100644 --- a/src/query/src/datafusion/catalog_adapter.rs +++ b/src/query/src/datafusion/catalog_adapter.rs @@ -231,12 +231,7 @@ impl SchemaProvider for SchemaProviderAdapter { .map(|_| table)) } - fn rename_table( - &self, - _name: &str, - _new_name: String, - _table: TableRef, - ) -> catalog_error::Result> { + fn rename_table(&self, _name: &str, _new_name: String) -> catalog_error::Result { todo!() } diff --git a/src/table/src/requests.rs b/src/table/src/requests.rs index 780d26788f80..4d6a6b1b8746 100644 --- a/src/table/src/requests.rs +++ b/src/table/src/requests.rs @@ -70,6 +70,12 @@ pub struct AlterTableRequest { pub alter_kind: AlterKind, } +impl AlterTableRequest { + pub fn is_rename_table(&self) -> bool { + matches!(self.alter_kind, AlterKind::RenameTable { .. }) + } +} + /// Add column request #[derive(Debug, Clone)] pub struct AddColumnRequest {