Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support renaming table in the catalog manger #824

Merged
merged 16 commits into from
Jan 12, 2023
Merged
11 changes: 11 additions & 0 deletions src/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ pub trait CatalogManager: CatalogList {
/// schema registered.
async fn register_schema(&self, request: RegisterSchemaRequest) -> Result<bool>;

/// Rename a table to [RenameTableRequest::new_table_name], returns whether the table is renamed.
async fn rename_table(&self, request: RenameTableRequest, table_id: TableId) -> Result<bool>;
e1ijah1 marked this conversation as resolved.
Show resolved Hide resolved

/// Register a system table, should be called before starting the manager.
async fn register_system_table(&self, request: RegisterSystemTableRequest)
-> error::Result<()>;
Expand Down Expand Up @@ -142,6 +145,14 @@ impl Debug for RegisterTableRequest {
}
}

#[derive(Debug, Clone)]
pub struct RenameTableRequest {
pub catalog: String,
pub schema: String,
pub table_name: String,
pub new_table_name: String,
}

#[derive(Clone)]
pub struct DeregisterTableRequest {
pub catalog: String,
Expand Down
42 changes: 41 additions & 1 deletion src/catalog/src/local/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ use crate::tables::SystemCatalog;
use crate::{
format_full_table_name, handle_system_table_request, CatalogList, CatalogManager,
CatalogProvider, CatalogProviderRef, DeregisterTableRequest, RegisterSchemaRequest,
RegisterSystemTableRequest, RegisterTableRequest, SchemaProvider, SchemaProviderRef,
RegisterSystemTableRequest, RegisterTableRequest, RenameTableRequest, SchemaProvider,
SchemaProviderRef,
};

/// A `CatalogManager` consists of a system catalog and a bunch of user catalogs.
Expand Down Expand Up @@ -379,6 +380,45 @@ impl CatalogManager for LocalCatalogManager {
}
}

async fn rename_table(&self, request: RenameTableRequest, table_id: TableId) -> Result<bool> {
let started = self.init_lock.lock().await;

ensure!(
*started,
IllegalManagerStateSnafu {
msg: "Catalog manager not started",
}
);

let catalog_name = &request.catalog;
let schema_name = &request.schema;

let catalog = self
.catalogs
.catalog(catalog_name)?
.context(CatalogNotFoundSnafu { catalog_name })?;

let schema = catalog
.schema(schema_name)?
.with_context(|| SchemaNotFoundSnafu {
catalog: catalog_name,
schema: schema_name,
})?;

// rename table in system catalog
self.system
.register_table(
catalog_name.clone(),
schema_name.clone(),
request.new_table_name.clone(),
table_id,
)
.await?;
Ok(schema
.rename_table(&request.table_name, request.new_table_name)
.is_ok())
}

async fn deregister_table(&self, _request: DeregisterTableRequest) -> Result<bool> {
UnimplementedSnafu {
operation: "deregister table",
Expand Down
121 changes: 119 additions & 2 deletions src/catalog/src/local/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@ use table::metadata::TableId;
use table::table::TableIdProvider;
use table::TableRef;

use crate::error::{CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu};
use crate::error::{
CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu, TableNotFoundSnafu,
};
use crate::schema::SchemaProvider;
use crate::{
CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef, DeregisterTableRequest,
RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, SchemaProviderRef,
RegisterSchemaRequest, RegisterSystemTableRequest, RegisterTableRequest, RenameTableRequest,
SchemaProviderRef,
};

/// Simple in-memory list of catalogs
Expand Down Expand Up @@ -89,6 +92,25 @@ impl CatalogManager for MemoryCatalogManager {
.map(|v| v.is_none())
}

async fn rename_table(&self, request: RenameTableRequest, _table_id: TableId) -> Result<bool> {
let catalogs = self.catalogs.write().unwrap();
let catalog = catalogs
.get(&request.catalog)
.context(CatalogNotFoundSnafu {
catalog_name: &request.catalog,
})?
.clone();
let schema = catalog
.schema(&request.schema)?
.with_context(|| SchemaNotFoundSnafu {
catalog: &request.catalog,
schema: &request.schema,
})?;
Ok(schema
.rename_table(&request.table_name, request.new_table_name)
.is_ok())
}

async fn deregister_table(&self, request: DeregisterTableRequest) -> Result<bool> {
let catalogs = self.catalogs.write().unwrap();
let catalog = catalogs
Expand Down Expand Up @@ -290,6 +312,20 @@ impl SchemaProvider for MemorySchemaProvider {
}
}

fn rename_table(&self, name: &str, new_name: String) -> Result<TableRef> {
let mut tables = self.tables.write().unwrap();
if tables.get(name).is_some() {
let table = tables.remove(name).unwrap();
tables.insert(new_name, table.clone());
Ok(table)
} else {
TableNotFoundSnafu {
table_info: name.to_string(),
}
.fail()?
}
}

fn deregister_table(&self, name: &str) -> Result<Option<TableRef>> {
let mut tables = self.tables.write().unwrap();
Ok(tables.remove(name))
Expand Down Expand Up @@ -354,6 +390,87 @@ mod tests {
assert_eq!(StatusCode::TableAlreadyExists, err.status_code());
}

#[tokio::test]
async fn test_mem_provider_rename_table() {
let provider = MemorySchemaProvider::new();
let table_name = "num";
assert!(!provider.table_exist(table_name).unwrap());
let test_table: TableRef = Arc::new(NumbersTable::default());
// register test table
assert!(provider
.register_table(table_name.to_string(), test_table.clone())
.unwrap()
.is_none());
assert!(provider.table_exist(table_name).unwrap());

// rename test table
let new_table_name = "numbers";
provider
.rename_table(table_name, new_table_name.to_string())
.unwrap();

// test old table name not exist
assert!(!provider.table_exist(table_name).unwrap());
assert!(provider.deregister_table(table_name).unwrap().is_none());

// test new table name exists
assert!(provider.table_exist(new_table_name).unwrap());
let registered_table = provider.table(new_table_name).unwrap().unwrap();
assert_eq!(
registered_table.table_info().ident.table_id,
test_table.table_info().ident.table_id
);

let other_table = Arc::new(NumbersTable::new(2));
let result = provider.register_table(new_table_name.to_string(), other_table);
let err = result.err().unwrap();
assert_eq!(StatusCode::TableAlreadyExists, err.status_code());
}

#[tokio::test]
async fn test_catalog_rename_table() {
let catalog = MemoryCatalogManager::default();
let schema = catalog
.schema(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME)
.unwrap()
.unwrap();

// register table
let table_name = "num";
let table_id = 2333;
let table: TableRef = Arc::new(NumbersTable::new(table_id));
let register_table_req = RegisterTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
table_id,
table,
};
assert!(catalog.register_table(register_table_req).await.unwrap());
assert!(schema.table_exist(table_name).unwrap());

// rename table
let new_table_name = "numbers";
let rename_table_req = RenameTableRequest {
catalog: DEFAULT_CATALOG_NAME.to_string(),
schema: DEFAULT_SCHEMA_NAME.to_string(),
table_name: table_name.to_string(),
new_table_name: new_table_name.to_string(),
};
assert!(catalog
.rename_table(rename_table_req, table_id)
.await
.unwrap());
assert!(!schema.table_exist(table_name).unwrap());
assert!(schema.table_exist(new_table_name).unwrap());

let registered_table = catalog
.table(DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, new_table_name)
.unwrap()
.unwrap();
assert_eq!(registered_table.table_info().ident.table_id, table_id);
}

#[test]
pub fn test_register_if_absent() {
let list = MemoryCatalogManager::default();
Expand Down
16 changes: 15 additions & 1 deletion src/catalog/src/remote/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ use crate::remote::{Kv, KvBackendRef};
use crate::{
handle_system_table_request, CatalogList, CatalogManager, CatalogProvider, CatalogProviderRef,
DeregisterTableRequest, RegisterSchemaRequest, RegisterSystemTableRequest,
RegisterTableRequest, SchemaProvider, SchemaProviderRef,
RegisterTableRequest, RenameTableRequest, SchemaProvider, SchemaProviderRef,
};

/// Catalog manager based on metasrv.
Expand Down Expand Up @@ -449,6 +449,13 @@ impl CatalogManager for RemoteCatalogManager {
Ok(true)
}

async fn rename_table(&self, _request: RenameTableRequest, _table_id: TableId) -> Result<bool> {
UnimplementedSnafu {
operation: "rename table",
}
.fail()
}

async fn register_system_table(&self, request: RegisterSystemTableRequest) -> Result<()> {
let mut requests = self.system_table_requests.lock().await;
requests.push(request);
Expand Down Expand Up @@ -739,6 +746,13 @@ impl SchemaProvider for RemoteSchemaProvider {
prev
}

fn rename_table(&self, _name: &str, _new_name: String) -> Result<TableRef> {
UnimplementedSnafu {
operation: "rename table",
}
.fail()
}

fn deregister_table(&self, name: &str) -> Result<Option<TableRef>> {
let table_name = name.to_string();
let table_key = self.build_regional_table_key(&table_name).to_string();
Expand Down
4 changes: 4 additions & 0 deletions src/catalog/src/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ pub trait SchemaProvider: Sync + Send {
/// If a table of the same name existed before, it returns "Table already exists" error.
fn register_table(&self, name: String, table: TableRef) -> Result<Option<TableRef>>;

/// If supported by the implementation, renames an existing table from this schema and returns it.
/// If no table of that name exists, returns "Table not found" error.
fn rename_table(&self, name: &str, new_name: String) -> Result<TableRef>;

/// If supported by the implementation, removes an existing table from this schema and returns it.
/// If no table of that name exists, returns Ok(None).
fn deregister_table(&self, name: &str) -> Result<Option<TableRef>>;
Expand Down
35 changes: 24 additions & 11 deletions src/catalog/src/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,11 +186,23 @@ fn build_system_catalog_schema() -> Schema {
SchemaBuilder::try_from(cols).unwrap().build().unwrap()
}

pub fn build_table_insert_request(full_table_name: String, table_id: TableId) -> InsertRequest {
/// Formats key string for table entry in system catalog
#[inline]
pub fn format_table_entry_key(catalog: &str, schema: &str, table_id: TableId) -> String {
e1ijah1 marked this conversation as resolved.
Show resolved Hide resolved
format!("{catalog}.{schema}.{table_id}")
}

pub fn build_table_insert_request(
catalog: String,
schema: String,
table_name: String,
table_id: TableId,
) -> InsertRequest {
let entry_key = format_table_entry_key(&catalog, &schema, table_id);
build_insert_request(
EntryType::Table,
full_table_name.as_bytes(),
serde_json::to_string(&TableEntryValue { table_id })
entry_key.as_bytes(),
serde_json::to_string(&TableEntryValue { table_name })
.unwrap()
.as_bytes(),
)
Expand Down Expand Up @@ -285,8 +297,8 @@ pub fn decode_system_catalog(
}

EntryType::Table => {
// As for table entry, the key is a string with format: `<catalog_name>.<schema_name>.<table_name>`
// and the value is a JSON string with format: `{"table_id": <table_id>}`
// As for table entry, the key is a string with format: `<catalog_name>.<schema_name>.<table_id>`
// and the value is a JSON string with format: `{"table_name": <table_name>}`
let table_parts = key.split('.').collect::<Vec<_>>();
ensure!(
table_parts.len() >= 3,
Expand All @@ -298,11 +310,12 @@ pub fn decode_system_catalog(
debug!("Table meta value: {}", String::from_utf8_lossy(value));
let table_meta: TableEntryValue =
serde_json::from_slice(value).context(ValueDeserializeSnafu)?;
let table_id = table_parts[2].parse::<TableId>().unwrap();
Ok(Entry::Table(TableEntry {
catalog_name: table_parts[0].to_string(),
schema_name: table_parts[1].to_string(),
table_name: table_parts[2].to_string(),
table_id: table_meta.table_id,
table_name: table_meta.table_name,
table_id,
}))
}
}
Expand Down Expand Up @@ -362,7 +375,7 @@ pub struct TableEntry {

#[derive(Debug, Serialize, Deserialize, PartialEq, Eq)]
pub struct TableEntryValue {
pub table_id: TableId,
pub table_name: String,
}

#[cfg(test)]
Expand Down Expand Up @@ -415,8 +428,8 @@ mod tests {
pub fn test_decode_table() {
let entry = decode_system_catalog(
Some(EntryType::Table as u8),
Some("some_catalog.some_schema.some_table".as_bytes()),
Some("{\"table_id\":42}".as_bytes()),
Some("some_catalog.some_schema.42".as_bytes()),
Some("{\"table_name\":\"some_table\"}".as_bytes()),
)
.unwrap();

Expand All @@ -435,7 +448,7 @@ mod tests {
pub fn test_decode_mismatch() {
decode_system_catalog(
Some(EntryType::Table as u8),
Some("some_catalog.some_schema.some_table".as_bytes()),
Some("some_catalog.some_schema.42".as_bytes()),
None,
)
.unwrap();
Expand Down
11 changes: 6 additions & 5 deletions src/catalog/src/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ use table::{Table, TableRef};

use crate::error::{Error, InsertCatalogRecordSnafu};
use crate::system::{build_schema_insert_request, build_table_insert_request, SystemCatalogTable};
use crate::{
format_full_table_name, CatalogListRef, CatalogProvider, SchemaProvider, SchemaProviderRef,
};
use crate::{CatalogListRef, CatalogProvider, SchemaProvider, SchemaProviderRef};

/// Tables holds all tables created by user.
pub struct Tables {
Expand Down Expand Up @@ -233,6 +231,10 @@ impl SchemaProvider for InformationSchema {
panic!("System catalog & schema does not support register table")
}

fn rename_table(&self, _name: &str, _new_name: String) -> crate::error::Result<TableRef> {
unimplemented!("System catalog & schema does not support rename table")
}

fn deregister_table(&self, _name: &str) -> crate::error::Result<Option<TableRef>> {
panic!("System catalog & schema does not support deregister table")
}
Expand Down Expand Up @@ -269,8 +271,7 @@ impl SystemCatalog {
table_name: String,
table_id: TableId,
) -> crate::error::Result<usize> {
let full_table_name = format_full_table_name(&catalog, &schema, &table_name);
let request = build_table_insert_request(full_table_name, table_id);
let request = build_table_insert_request(catalog, schema, table_name, table_id);
self.information_schema
.system
.insert(request)
Expand Down
Loading