Skip to content

Commit

Permalink
fix: improve the system catalog's renaming process
Browse files Browse the repository at this point in the history
  • Loading branch information
e1ijah1 committed Jan 11, 2023
1 parent 73dd31f commit ab3a269
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 139 deletions.
4 changes: 2 additions & 2 deletions src/catalog/src/local/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -405,11 +405,11 @@ impl CatalogManager for LocalCatalogManager {
schema: schema_name,
})?;

// rename table in system catalog
self.system
.rename_table(
.register_table(
catalog_name.clone(),
schema_name.clone(),
request.table_name.clone(),
request.new_table_name.clone(),
request.table_id,
)
Expand Down
90 changes: 27 additions & 63 deletions src/catalog/src/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use serde::{Deserialize, Serialize};
use snafu::{ensure, OptionExt, ResultExt};
use table::engine::{EngineContext, TableEngineRef};
use table::metadata::{TableId, TableInfoRef};
use table::requests::{CreateTableRequest, DeleteRequest, InsertRequest, OpenTableRequest};
use table::requests::{CreateTableRequest, InsertRequest, OpenTableRequest};
use table::{Table, TableRef};

use crate::error::{
Expand Down Expand Up @@ -73,11 +73,6 @@ impl Table for SystemCatalogTable {
self.table.insert(request).await
}

/// Delete row from table
async fn delete(&self, request: DeleteRequest) -> table::Result<usize> {
self.table.delete(request).await
}

fn table_info(&self) -> TableInfoRef {
self.table_info.clone()
}
Expand Down Expand Up @@ -173,29 +168,40 @@ fn build_system_catalog_schema() -> Schema {
ColumnSchema::new(
"value".to_string(),
ConcreteDataType::binary_datatype(),
true,
false,
),
ColumnSchema::new(
"gmt_created".to_string(),
ConcreteDataType::timestamp_millisecond_datatype(),
true,
false,
),
ColumnSchema::new(
"gmt_modified".to_string(),
ConcreteDataType::timestamp_millisecond_datatype(),
true,
false,
),
];

// The schema of this table must be valid.
SchemaBuilder::try_from(cols).unwrap().build().unwrap()
}

pub fn build_table_insert_request(full_table_name: String, table_id: TableId) -> InsertRequest {
/// Formats key string for table entry in system catalog
pub fn format_table_entry_key(catalog: &str, schema: &str, table_id: TableId) -> String {
format!("{catalog}.{schema}.{table_id}")
}

pub fn build_table_insert_request(
catalog: String,
schema: String,
table_name: String,
table_id: TableId,
) -> InsertRequest {
let entry_key = format_table_entry_key(&catalog, &schema, table_id);
build_insert_request(
EntryType::Table,
full_table_name.as_bytes(),
serde_json::to_string(&TableEntryValue { table_id })
entry_key.as_bytes(),
serde_json::to_string(&TableEntryValue { table_name })
.unwrap()
.as_bytes(),
)
Expand All @@ -212,27 +218,6 @@ pub fn build_schema_insert_request(catalog_name: String, schema_name: String) ->
)
}

pub fn build_table_delete_request(full_table_name: String) -> DeleteRequest {
build_delete_request(EntryType::Table, full_table_name.as_bytes())
}

pub fn build_delete_request(entry_type: EntryType, key: &[u8]) -> DeleteRequest {
let mut key_column_values = HashMap::with_capacity(3);
key_column_values.insert(
"entry_type".to_string(),
Arc::new(UInt8Vector::from_slice(&[entry_type as u8])) as _,
);
key_column_values.insert(
"key".to_string(),
Arc::new(BinaryVector::from_slice(&[key])) as _,
);
key_column_values.insert(
"timestamp".to_string(),
Arc::new(TimestampMillisecondVector::from_slice(&[0])) as _,
);
DeleteRequest { key_column_values }
}

pub fn build_insert_request(entry_type: EntryType, key: &[u8], value: &[u8]) -> InsertRequest {
let mut columns_values = HashMap::with_capacity(6);
columns_values.insert(
Expand Down Expand Up @@ -311,8 +296,8 @@ pub fn decode_system_catalog(
}

EntryType::Table => {
// As for table entry, the key is a string with format: `<catalog_name>.<schema_name>.<table_name>`
// and the value is a JSON string with format: `{"table_id": <table_id>}`
// As for table entry, the key is a string with format: `<catalog_name>.<schema_name>.<table_id>`
// and the value is a JSON string with format: `{"table_name": <table_name>}`
let table_parts = key.split('.').collect::<Vec<_>>();
ensure!(
table_parts.len() >= 3,
Expand All @@ -324,11 +309,12 @@ pub fn decode_system_catalog(
debug!("Table meta value: {}", String::from_utf8_lossy(value));
let table_meta: TableEntryValue =
serde_json::from_slice(value).context(ValueDeserializeSnafu)?;
let table_id = table_parts[2].parse::<u32>().unwrap();
Ok(Entry::Table(TableEntry {
catalog_name: table_parts[0].to_string(),
schema_name: table_parts[1].to_string(),
table_name: table_parts[2].to_string(),
table_id: table_meta.table_id,
table_name: table_meta.table_name,
table_id,
}))
}
}
Expand Down Expand Up @@ -388,7 +374,7 @@ pub struct TableEntry {

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct TableEntryValue {
pub table_id: TableId,
pub table_name: String,
}

#[cfg(test)]
Expand All @@ -404,8 +390,6 @@ mod tests {
use tempdir::TempDir;

use super::*;
use crate::error::DeleteCatalogRecordSnafu;
use crate::format_full_table_name;

#[test]
pub fn test_decode_catalog_entry() {
Expand Down Expand Up @@ -443,8 +427,8 @@ mod tests {
pub fn test_decode_table() {
let entry = decode_system_catalog(
Some(EntryType::Table as u8),
Some("some_catalog.some_schema.some_table".as_bytes()),
Some("{\"table_id\":42}".as_bytes()),
Some("some_catalog.some_schema.42".as_bytes()),
Some("{\"table_name\":\"some_table\"}".as_bytes()),
)
.unwrap();

Expand All @@ -463,7 +447,7 @@ mod tests {
pub fn test_decode_mismatch() {
decode_system_catalog(
Some(EntryType::Table as u8),
Some("some_catalog.some_schema.some_table".as_bytes()),
Some("some_catalog.some_schema.42".as_bytes()),
None,
)
.unwrap();
Expand Down Expand Up @@ -515,24 +499,4 @@ mod tests {
assert_eq!(SYSTEM_CATALOG_NAME, info.catalog_name);
assert_eq!(INFORMATION_SCHEMA_NAME, info.schema_name);
}

#[tokio::test]
async fn test_system_table_delete() {
let (_dir, table_engine) = prepare_table_engine().await;
let system_table = SystemCatalogTable::new(table_engine).await.unwrap();
let table_name = "test_table";
let table_id = 42;
let full_table_name =
format_full_table_name(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, table_name);
let insert_req = build_table_insert_request(full_table_name.clone(), table_id);

assert!(system_table.insert(insert_req).await.is_ok());

let delete_req = build_table_delete_request(full_table_name);
let ret = system_table
.delete(delete_req)
.await
.context(DeleteCatalogRecordSnafu);
assert!(ret.is_ok());
}
}
78 changes: 4 additions & 74 deletions src/catalog/src/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,9 @@ use table::metadata::{TableId, TableInfoRef};
use table::table::scan::SimpleTableScan;
use table::{Table, TableRef};

use crate::error::{DeleteCatalogRecordSnafu, Error, InsertCatalogRecordSnafu};
use crate::system::{
build_schema_insert_request, build_table_delete_request, build_table_insert_request,
SystemCatalogTable,
};
use crate::{
format_full_table_name, CatalogListRef, CatalogProvider, SchemaProvider, SchemaProviderRef,
};
use crate::error::{Error, InsertCatalogRecordSnafu};
use crate::system::{build_schema_insert_request, build_table_insert_request, SystemCatalogTable};
use crate::{CatalogListRef, CatalogProvider, SchemaProvider, SchemaProviderRef};

/// Tables holds all tables created by user.
pub struct Tables {
Expand Down Expand Up @@ -281,35 +276,14 @@ impl SystemCatalog {
table_name: String,
table_id: TableId,
) -> crate::error::Result<usize> {
let full_table_name = format_full_table_name(&catalog, &schema, &table_name);
let request = build_table_insert_request(full_table_name, table_id);
let request = build_table_insert_request(catalog, schema, table_name, table_id);
self.information_schema
.system
.insert(request)
.await
.context(InsertCatalogRecordSnafu)
}

pub async fn rename_table(
&self,
catalog: String,
schema: String,
table_name: String,
new_table_name: String,
table_id: TableId,
) -> crate::error::Result<usize> {
let full_table_name = format_full_table_name(&catalog, &schema, &table_name);
let delete_table_req = build_table_delete_request(full_table_name);
self.information_schema
.system
.delete(delete_table_req)
.await
.context(DeleteCatalogRecordSnafu)?;

self.register_table(catalog, schema, new_table_name, table_id)
.await
}

pub async fn register_schema(
&self,
catalog: String,
Expand Down Expand Up @@ -381,8 +355,6 @@ mod tests {
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
use common_query::physical_plan::SessionContext;
use futures_util::StreamExt;
use mito::config::EngineConfig;
use mito::table::test_util::{new_test_object_store, MockEngine, MockMitoEngine};
use table::table::numbers::NumbersTable;

use super::*;
Expand Down Expand Up @@ -451,46 +423,4 @@ mod tests {
panic!("Record batch should not be empty!")
}
}

async fn create_system_catalog() -> Result<SystemCatalog, Error> {
let (_dir, object_store) = new_test_object_store("setup_mock_engine_and_table").await;
let mock_engine = Arc::new(MockMitoEngine::new(
EngineConfig::default(),
MockEngine::default(),
object_store,
));
let table = SystemCatalogTable::new(mock_engine.clone()).await?;
let memory_catalog_list = new_memory_catalog_list().unwrap();
Ok(SystemCatalog::new(table, memory_catalog_list, mock_engine))
}

#[tokio::test]
async fn test_rename_table() {
let system_catalog = create_system_catalog().await.unwrap();
let catalog = DEFAULT_CATALOG_NAME.to_string();
let schema = DEFAULT_SCHEMA_NAME.to_string();
let table_name = "test_table";
let table_id = 42;
assert!(system_catalog
.register_table(
catalog.clone(),
schema.clone(),
table_name.to_string(),
table_id
)
.await
.is_ok());

let new_table_name = "demo";
let ret = system_catalog
.rename_table(
catalog.clone(),
schema.clone(),
table_name.to_string(),
new_table_name.to_string(),
table_id,
)
.await;
assert!(ret.is_ok());
}
}

0 comments on commit ab3a269

Please sign in to comment.