Skip to content

Commit

Permalink
feat: support renaming table in the catalog manger (#824)
Browse files Browse the repository at this point in the history
* feat: support renaming table in the catalog manger

* feat: implement rename table for local catalog manager

* chore: fmt code

* fix: update system catalog when renaming table in local catalog manager

* chore: add instance test for rename table

* chore: fix frontend test

* chore: fix comment

* chore: fix rename table test

* fix: renaming a table with an existing name

* fix: improve the system catalog's renaming process

* chore: improve the code

* chore: improve the comment

Co-authored-by: Yingwen <[email protected]>

* chore: improve the code

* chore: fix tests

* chore: fix instance_test

* chore: improve the code

Co-authored-by: Yingwen <[email protected]>
  • Loading branch information
e1ijah1 and evenyag committed Jan 12, 2023
1 parent 5e89f1b commit 6775c5b
Show file tree
Hide file tree
Showing 22 changed files with 467 additions and 55 deletions.
12 changes: 12 additions & 0 deletions src/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ pub trait CatalogManager: CatalogList {
/// schema registered.
async fn register_schema(&self, request: RegisterSchemaRequest) -> Result<bool>;

/// Rename a table to [RenameTableRequest::new_table_name], returns whether the table is renamed.
async fn rename_table(&self, request: RenameTableRequest) -> Result<bool>;

/// Register a system table, should be called before starting the manager.
async fn register_system_table(&self, request: RegisterSystemTableRequest)
-> error::Result<()>;
Expand Down Expand Up @@ -142,6 +145,15 @@ impl Debug for RegisterTableRequest {
}
}

#[derive(Debug, Clone)]
pub struct RenameTableRequest {
pub catalog: String,
pub schema: String,
pub table_name: String,
pub new_table_name: String,
pub table_id: TableId,
}

#[derive(Clone)]
pub struct DeregisterTableRequest {
pub catalog: String,
Expand Down
42 changes: 41 additions & 1 deletion src/catalog/src/local/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ use crate::tables::SystemCatalog;
use crate::{
format_full_table_name, handle_system_table_request, CatalogList, CatalogManager,
CatalogProvider, CatalogProviderRef, DeregisterTableRequest, RegisterSchemaRequest,
RegisterSystemTableRequest, RegisterTableRequest, SchemaProvider, SchemaProviderRef,
RegisterSystemTableRequest, RegisterTableRequest, RenameTableRequest, SchemaProvider,
SchemaProviderRef,
};

/// A `CatalogManager` consists of a system catalog and a bunch of user catalogs.
Expand Down Expand Up @@ -379,6 +380,45 @@ impl CatalogManager for LocalCatalogManager {
}
}

async fn rename_table(&self, request: RenameTableRequest) -> Result<bool> {
let started = self.init_lock.lock().await;

ensure!(
*started,
IllegalManagerStateSnafu {
msg: "Catalog manager not started",
}
);

let catalog_name = &request.catalog;
let schema_name = &request.schema;

let catalog = self
.catalogs
.catalog(catalog_name)?
.context(CatalogNotFoundSnafu { catalog_name })?;

let schema = catalog
.schema(schema_name)?
.with_context(|| SchemaNotFoundSnafu {
catalog: catalog_name,
schema: schema_name,
})?;

// rename table in system catalog
self.system
.register_table(
catalog_name.clone(),
schema_name.clone(),
request.new_table_name.clone(),
request.table_id,
)
.await?;
Ok(schema
.rename_table(&request.table_name, request.new_table_name)
.is_ok())
}

async fn deregister_table(&self, _request: DeregisterTableRequest) -> Result<bool> {
UnimplementedSnafu {
operation: "deregister table",
Expand Down
119 changes: 117 additions & 2 deletions src/catalog/src/local/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@ use table::metadata::TableId;
use table::table::TableIdProvider;
use table::TableRef;

use crate::error::{CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu};
use crate::error::{
CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu, TableNotFoundSnafu,
};
use crate::schema::SchemaProvider;
use crate::{
CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, DeregisterTableRequest,
RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, SchemaProviderRef,
RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, RenameTableRequest,
SchemaProviderRef,
};

/// Simple in-memory list of catalogs
Expand Down Expand Up @@ -89,6 +92,25 @@ impl CatalogManager for MemoryCatalogManager {
.map(|v| v.is_none())
}

async fn rename_table(&self, request: RenameTableRequest) -> Result<bool> {
let catalogs = self.catalogs.write().unwrap();
let catalog = catalogs
.get(&request.catalog)
.context(CatalogNotFoundSnafu {
catalog_name: &request.catalog,
})?
.clone();
let schema = catalog
.schema(&request.schema)?
.with_context(|| SchemaNotFoundSnafu {
catalog: &request.catalog,
schema: &request.schema,
})?;
Ok(schema
.rename_table(&request.table_name, request.new_table_name)
.is_ok())
}

async fn deregister_table(&self, request: DeregisterTableRequest) -> Result<bool> {
let catalogs = self.catalogs.write().unwrap();
let catalog = catalogs
Expand Down Expand Up @@ -290,6 +312,20 @@ impl SchemaProvider for MemorySchemaProvider {
}
}

fn rename_table(&self, name: &str, new_name: String) -> Result<TableRef> {
let mut tables = self.tables.write().unwrap();
if tables.get(name).is_some() {
let table = tables.remove(name).unwrap();
tables.insert(new_name, table.clone());
Ok(table)
} else {
TableNotFoundSnafu {
table_info: name.to_string(),
}
.fail()?
}
}

fn deregister_table(&self, name: &str) -> Result<Option<TableRef>> {
let mut tables = self.tables.write().unwrap();
Ok(tables.remove(name))
Expand Down Expand Up @@ -354,6 +390,85 @@ mod tests {
assert_eq!(StatusCode::TableAlreadyExists, err.status_code());
}

#[tokio::test]
async fn test_mem_provider_rename_table() {
let provider = MemorySchemaProvider::new();
let table_name = "num";
assert!(!provider.table_exist(table_name).unwrap());
let test_table: TableRef = Arc::new(NumbersTable::default());
// register test table
assert!(provider
.register_table(table_name.to_string(), test_table.clone())
.unwrap()
.is_none());
assert!(provider.table_exist(table_name).unwrap());

// rename test table
let new_table_name = "numbers";
provider
.rename_table(table_name, new_table_name.to_string())
.unwrap();

// test old table name not exist
assert!(!provider.table_exist(table_name).unwrap());
assert!(provider.deregister_table(table_name).unwrap().is_none());

// test new table name exists
assert!(provider.table_exist(new_table_name).unwrap());
let registered_table = provider.table(new_table_name).unwrap().unwrap();
assert_eq!(
registered_table.table_info().ident.table_id,
test_table.table_info().ident.table_id
);

let other_table = Arc::new(NumbersTable::new(2));
let result = provider.register_table(new_table_name.to_string(), other_table);
let err = result.err().unwrap();
assert_eq!(StatusCode::TableAlreadyExists, err.status_code());
}

#[tokio::test]
async fn test_catalog_rename_table() {
let catalog = MemoryCatalogManager::default();
let schema = catalog
.schema(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME)
.unwrap()
.unwrap();

// register table
let table_name = "num";
let table_id = 2333;
let table: TableRef = Arc::new(NumbersTable::new(table_id));
let register_table_req = RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
table_id,
table,
};
assert!(catalog.register_table(register_table_req).await.unwrap());
assert!(schema.table_exist(table_name).unwrap());

// rename table
let new_table_name = "numbers";
let rename_table_req = RenameTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
new_table_name: new_table_name.to_string(),
table_id,
};
assert!(catalog.rename_table(rename_table_req).await.unwrap());
assert!(!schema.table_exist(table_name).unwrap());
assert!(schema.table_exist(new_table_name).unwrap());

let registered_table = catalog
.table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, new_table_name)
.unwrap()
.unwrap();
assert_eq!(registered_table.table_info().ident.table_id, table_id);
}

#[test]
pub fn test_register_if_absent() {
let list = MemoryCatalogManager::default();
Expand Down
16 changes: 15 additions & 1 deletion src/catalog/src/remote/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use crate::remote::{Kv, KvBackendRef};
use crate::{
handle_system_table_request, CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef,
DeregisterTableRequest, RegisterSchemaRequest, RegisterSystemTableRequest,
RegisterTableRequest, SchemaProvider, SchemaProviderRef,
RegisterTableRequest, RenameTableRequest, SchemaProvider, SchemaProviderRef,
};

/// Catalog manager based on metasrv.
Expand Down Expand Up @@ -449,6 +449,13 @@ impl CatalogManager for RemoteCatalogManager {
Ok(true)
}

async fn rename_table(&self, _request: RenameTableRequest) -> Result<bool> {
UnimplementedSnafu {
operation: "rename table",
}
.fail()
}

async fn register_system_table(&self, request: RegisterSystemTableRequest) -> Result<()> {
let mut requests = self.system_table_requests.lock().await;
requests.push(request);
Expand Down Expand Up @@ -739,6 +746,13 @@ impl SchemaProvider for RemoteSchemaProvider {
prev
}

fn rename_table(&self, _name: &str, _new_name: String) -> Result<TableRef> {
UnimplementedSnafu {
operation: "rename table",
}
.fail()
}

fn deregister_table(&self, name: &str) -> Result<Option<TableRef>> {
let table_name = name.to_string();
let table_key = self.build_regional_table_key(&table_name).to_string();
Expand Down
4 changes: 4 additions & 0 deletions src/catalog/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ pub trait SchemaProvider: Sync + Send {
/// If a table of the same name existed before, it returns "Table already exists" error.
fn register_table(&self, name: String, table: TableRef) -> Result<Option<TableRef>>;

/// If supported by the implementation, renames an existing table from this schema and returns it.
/// If no table of that name exists, returns "Table not found" error.
fn rename_table(&self, name: &str, new_name: String) -> Result<TableRef>;

/// If supported by the implementation, removes an existing table from this schema and returns it.
/// If no table of that name exists, returns Ok(None).
fn deregister_table(&self, name: &str) -> Result<Option<TableRef>>;
Expand Down
35 changes: 24 additions & 11 deletions src/catalog/src/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,23 @@ fn build_system_catalog_schema() -> Schema {
SchemaBuilder::try_from(cols).unwrap().build().unwrap()
}

pub fn build_table_insert_request(full_table_name: String, table_id: TableId) -> InsertRequest {
/// Formats key string for table entry in system catalog
#[inline]
pub fn format_table_entry_key(catalog: &str, schema: &str, table_id: TableId) -> String {
format!("{catalog}.{schema}.{table_id}")
}

pub fn build_table_insert_request(
catalog: String,
schema: String,
table_name: String,
table_id: TableId,
) -> InsertRequest {
let entry_key = format_table_entry_key(&catalog, &schema, table_id);
build_insert_request(
EntryType::Table,
full_table_name.as_bytes(),
serde_json::to_string(&TableEntryValue { table_id })
entry_key.as_bytes(),
serde_json::to_string(&TableEntryValue { table_name })
.unwrap()
.as_bytes(),
)
Expand Down Expand Up @@ -285,8 +297,8 @@ pub fn decode_system_catalog(
}

EntryType::Table => {
// As for table entry, the key is a string with format: `<catalog_name>.<schema_name>.<table_name>`
// and the value is a JSON string with format: `{"table_id": <table_id>}`
// As for table entry, the key is a string with format: `<catalog_name>.<schema_name>.<table_id>`
// and the value is a JSON string with format: `{"table_name": <table_name>}`
let table_parts = key.split('.').collect::<Vec<_>>();
ensure!(
table_parts.len() >= 3,
Expand All @@ -298,11 +310,12 @@ pub fn decode_system_catalog(
debug!("Table meta value: {}", String::from_utf8_lossy(value));
let table_meta: TableEntryValue =
serde_json::from_slice(value).context(ValueDeserializeSnafu)?;
let table_id = table_parts[2].parse::<TableId>().unwrap();
Ok(Entry::Table(TableEntry {
catalog_name: table_parts[0].to_string(),
schema_name: table_parts[1].to_string(),
table_name: table_parts[2].to_string(),
table_id: table_meta.table_id,
table_name: table_meta.table_name,
table_id,
}))
}
}
Expand Down Expand Up @@ -362,7 +375,7 @@ pub struct TableEntry {

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct TableEntryValue {
pub table_id: TableId,
pub table_name: String,
}

#[cfg(test)]
Expand Down Expand Up @@ -415,8 +428,8 @@ mod tests {
pub fn test_decode_table() {
let entry = decode_system_catalog(
Some(EntryType::Table as u8),
Some("some_catalog.some_schema.some_table".as_bytes()),
Some("{\"table_id\":42}".as_bytes()),
Some("some_catalog.some_schema.42".as_bytes()),
Some("{\"table_name\":\"some_table\"}".as_bytes()),
)
.unwrap();

Expand All @@ -435,7 +448,7 @@ mod tests {
pub fn test_decode_mismatch() {
decode_system_catalog(
Some(EntryType::Table as u8),
Some("some_catalog.some_schema.some_table".as_bytes()),
Some("some_catalog.some_schema.42".as_bytes()),
None,
)
.unwrap();
Expand Down
11 changes: 6 additions & 5 deletions src/catalog/src/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ use table::{Table, TableRef};

use crate::error::{Error, InsertCatalogRecordSnafu};
use crate::system::{build_schema_insert_request, build_table_insert_request, SystemCatalogTable};
use crate::{
format_full_table_name, CatalogListRef, CatalogProvider, SchemaProvider, SchemaProviderRef,
};
use crate::{CatalogListRef, CatalogProvider, SchemaProvider, SchemaProviderRef};

/// Tables holds all tables created by user.
pub struct Tables {
Expand Down Expand Up @@ -233,6 +231,10 @@ impl SchemaProvider for InformationSchema {
panic!("System catalog & schema does not support register table")
}

fn rename_table(&self, _name: &str, _new_name: String) -> crate::error::Result<TableRef> {
unimplemented!("System catalog & schema does not support rename table")
}

fn deregister_table(&self, _name: &str) -> crate::error::Result<Option<TableRef>> {
panic!("System catalog & schema does not support deregister table")
}
Expand Down Expand Up @@ -269,8 +271,7 @@ impl SystemCatalog {
table_name: String,
table_id: TableId,
) -> crate::error::Result<usize> {
let full_table_name = format_full_table_name(&catalog, &schema, &table_name);
let request = build_table_insert_request(full_table_name, table_id);
let request = build_table_insert_request(catalog, schema, table_name, table_id);
self.information_schema
.system
.insert(request)
Expand Down
Loading

0 comments on commit 6775c5b

Please sign in to comment.