diff --git a/src/catalog/src/local/manager.rs b/src/catalog/src/local/manager.rs index 6682864e4d13..c53141fb4702 100644 --- a/src/catalog/src/local/manager.rs +++ b/src/catalog/src/local/manager.rs @@ -405,11 +405,11 @@ impl CatalogManager for LocalCatalogManager { schema: schema_name, })?; + // rename table in system catalog self.system - .rename_table( + .register_table( catalog_name.clone(), schema_name.clone(), - request.table_name.clone(), request.new_table_name.clone(), request.table_id, ) diff --git a/src/catalog/src/system.rs b/src/catalog/src/system.rs index 8a45ab8db439..8f043c83398e 100644 --- a/src/catalog/src/system.rs +++ b/src/catalog/src/system.rs @@ -32,7 +32,7 @@ use serde::{Deserialize, Serialize}; use snafu::{ensure, OptionExt, ResultExt}; use table::engine::{EngineContext, TableEngineRef}; use table::metadata::{TableId, TableInfoRef}; -use table::requests::{CreateTableRequest, DeleteRequest, InsertRequest, OpenTableRequest}; +use table::requests::{CreateTableRequest, InsertRequest, OpenTableRequest}; use table::{Table, TableRef}; use crate::error::{ @@ -73,11 +73,6 @@ impl Table for SystemCatalogTable { self.table.insert(request).await } - /// Delete row from table - async fn delete(&self, request: DeleteRequest) -> table::Result { - self.table.delete(request).await - } - fn table_info(&self) -> TableInfoRef { self.table_info.clone() } @@ -173,17 +168,17 @@ fn build_system_catalog_schema() -> Schema { ColumnSchema::new( "value".to_string(), ConcreteDataType::binary_datatype(), - true, + false, ), ColumnSchema::new( "gmt_created".to_string(), ConcreteDataType::timestamp_millisecond_datatype(), - true, + false, ), ColumnSchema::new( "gmt_modified".to_string(), ConcreteDataType::timestamp_millisecond_datatype(), - true, + false, ), ]; @@ -191,11 +186,22 @@ fn build_system_catalog_schema() -> Schema { SchemaBuilder::try_from(cols).unwrap().build().unwrap() } -pub fn build_table_insert_request(full_table_name: String, table_id: TableId) -> InsertRequest { +/// Formats key string for table entry in system catalog +pub fn format_table_entry_key(catalog: &str, schema: &str, table_id: TableId) -> String { + format!("{catalog}.{schema}.{table_id}") +} + +pub fn build_table_insert_request( + catalog: String, + schema: String, + table_name: String, + table_id: TableId, +) -> InsertRequest { + let entry_key = format_table_entry_key(&catalog, &schema, table_id); build_insert_request( EntryType::Table, - full_table_name.as_bytes(), - serde_json::to_string(&TableEntryValue { table_id }) + entry_key.as_bytes(), + serde_json::to_string(&TableEntryValue { table_name }) .unwrap() .as_bytes(), ) @@ -212,27 +218,6 @@ pub fn build_schema_insert_request(catalog_name: String, schema_name: String) -> ) } -pub fn build_table_delete_request(full_table_name: String) -> DeleteRequest { - build_delete_request(EntryType::Table, full_table_name.as_bytes()) -} - -pub fn build_delete_request(entry_type: EntryType, key: &[u8]) -> DeleteRequest { - let mut key_column_values = HashMap::with_capacity(3); - key_column_values.insert( - "entry_type".to_string(), - Arc::new(UInt8Vector::from_slice(&[entry_type as u8])) as _, - ); - key_column_values.insert( - "key".to_string(), - Arc::new(BinaryVector::from_slice(&[key])) as _, - ); - key_column_values.insert( - "timestamp".to_string(), - Arc::new(TimestampMillisecondVector::from_slice(&[0])) as _, - ); - DeleteRequest { key_column_values } -} - pub fn build_insert_request(entry_type: EntryType, key: &[u8], value: &[u8]) -> InsertRequest { let mut columns_values = HashMap::with_capacity(6); columns_values.insert( @@ -311,8 +296,8 @@ pub fn decode_system_catalog( } EntryType::Table => { - // As for table entry, the key is a string with format: `..` - // and the value is a JSON string with format: `{"table_id": }` + // As for table entry, the key is a string with format: `..` + // and the value is a JSON string with format: `{"table_name": }` let table_parts = key.split('.').collect::>(); ensure!( table_parts.len() >= 3, @@ -324,11 +309,12 @@ pub fn decode_system_catalog( debug!("Table meta value: {}", String::from_utf8_lossy(value)); let table_meta: TableEntryValue = serde_json::from_slice(value).context(ValueDeserializeSnafu)?; + let table_id = table_parts[2].parse::().unwrap(); Ok(Entry::Table(TableEntry { catalog_name: table_parts[0].to_string(), schema_name: table_parts[1].to_string(), - table_name: table_parts[2].to_string(), - table_id: table_meta.table_id, + table_name: table_meta.table_name, + table_id, })) } } @@ -388,7 +374,7 @@ pub struct TableEntry { #[derive(Debug, Serialize, Deserialize, PartialEq, Eq)] pub struct TableEntryValue { - pub table_id: TableId, + pub table_name: String, } #[cfg(test)] @@ -404,8 +390,6 @@ mod tests { use tempdir::TempDir; use super::*; - use crate::error::DeleteCatalogRecordSnafu; - use crate::format_full_table_name; #[test] pub fn test_decode_catalog_entry() { @@ -443,8 +427,8 @@ mod tests { pub fn test_decode_table() { let entry = decode_system_catalog( Some(EntryType::Table as u8), - Some("some_catalog.some_schema.some_table".as_bytes()), - Some("{\"table_id\":42}".as_bytes()), + Some("some_catalog.some_schema.42".as_bytes()), + Some("{\"table_name\":\"some_table\"}".as_bytes()), ) .unwrap(); @@ -463,7 +447,7 @@ mod tests { pub fn test_decode_mismatch() { decode_system_catalog( Some(EntryType::Table as u8), - Some("some_catalog.some_schema.some_table".as_bytes()), + Some("some_catalog.some_schema.42".as_bytes()), None, ) .unwrap(); @@ -515,24 +499,4 @@ mod tests { assert_eq!(SYSTEM_CATALOG_NAME, info.catalog_name); assert_eq!(INFORMATION_SCHEMA_NAME, info.schema_name); } - - #[tokio::test] - async fn test_system_table_delete() { - let (_dir, table_engine) = prepare_table_engine().await; - let system_table = SystemCatalogTable::new(table_engine).await.unwrap(); - let table_name = "test_table"; - let table_id = 42; - let full_table_name = - format_full_table_name(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name); - let insert_req = build_table_insert_request(full_table_name.clone(), table_id); - - assert!(system_table.insert(insert_req).await.is_ok()); - - let delete_req = build_table_delete_request(full_table_name); - let ret = system_table - .delete(delete_req) - .await - .context(DeleteCatalogRecordSnafu); - assert!(ret.is_ok()); - } } diff --git a/src/catalog/src/tables.rs b/src/catalog/src/tables.rs index 9e64e51e7c42..9a02af90383d 100644 --- a/src/catalog/src/tables.rs +++ b/src/catalog/src/tables.rs @@ -38,14 +38,9 @@ use table::metadata::{TableId, TableInfoRef}; use table::table::scan::SimpleTableScan; use table::{Table, TableRef}; -use crate::error::{DeleteCatalogRecordSnafu, Error, InsertCatalogRecordSnafu}; -use crate::system::{ - build_schema_insert_request, build_table_delete_request, build_table_insert_request, - SystemCatalogTable, -}; -use crate::{ - format_full_table_name, CatalogListRef, CatalogProvider, SchemaProvider, SchemaProviderRef, -}; +use crate::error::{Error, InsertCatalogRecordSnafu}; +use crate::system::{build_schema_insert_request, build_table_insert_request, SystemCatalogTable}; +use crate::{CatalogListRef, CatalogProvider, SchemaProvider, SchemaProviderRef}; /// Tables holds all tables created by user. pub struct Tables { @@ -281,8 +276,7 @@ impl SystemCatalog { table_name: String, table_id: TableId, ) -> crate::error::Result { - let full_table_name = format_full_table_name(&catalog, &schema, &table_name); - let request = build_table_insert_request(full_table_name, table_id); + let request = build_table_insert_request(catalog, schema, table_name, table_id); self.information_schema .system .insert(request) @@ -290,26 +284,6 @@ impl SystemCatalog { .context(InsertCatalogRecordSnafu) } - pub async fn rename_table( - &self, - catalog: String, - schema: String, - table_name: String, - new_table_name: String, - table_id: TableId, - ) -> crate::error::Result { - let full_table_name = format_full_table_name(&catalog, &schema, &table_name); - let delete_table_req = build_table_delete_request(full_table_name); - self.information_schema - .system - .delete(delete_table_req) - .await - .context(DeleteCatalogRecordSnafu)?; - - self.register_table(catalog, schema, new_table_name, table_id) - .await - } - pub async fn register_schema( &self, catalog: String, @@ -381,8 +355,6 @@ mod tests { use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME}; use common_query::physical_plan::SessionContext; use futures_util::StreamExt; - use mito::config::EngineConfig; - use mito::table::test_util::{new_test_object_store, MockEngine, MockMitoEngine}; use table::table::numbers::NumbersTable; use super::*; @@ -451,46 +423,4 @@ mod tests { panic!("Record batch should not be empty!") } } - - async fn create_system_catalog() -> Result { - let (_dir, object_store) = new_test_object_store("setup_mock_engine_and_table").await; - let mock_engine = Arc::new(MockMitoEngine::new( - EngineConfig::default(), - MockEngine::default(), - object_store, - )); - let table = SystemCatalogTable::new(mock_engine.clone()).await?; - let memory_catalog_list = new_memory_catalog_list().unwrap(); - Ok(SystemCatalog::new(table, memory_catalog_list, mock_engine)) - } - - #[tokio::test] - async fn test_rename_table() { - let system_catalog = create_system_catalog().await.unwrap(); - let catalog = DEFAULT_CATALOG_NAME.to_string(); - let schema = DEFAULT_SCHEMA_NAME.to_string(); - let table_name = "test_table"; - let table_id = 42; - assert!(system_catalog - .register_table( - catalog.clone(), - schema.clone(), - table_name.to_string(), - table_id - ) - .await - .is_ok()); - - let new_table_name = "demo"; - let ret = system_catalog - .rename_table( - catalog.clone(), - schema.clone(), - table_name.to_string(), - new_table_name.to_string(), - table_id, - ) - .await; - assert!(ret.is_ok()); - } }