Skip to content

Commit

Permalink
feat: local catalog drop table (#913)
Browse files Browse the repository at this point in the history
* feat: local catalog drop table

* Update src/catalog/src/local/manager.rs

Co-authored-by: Lei, HUANG <[email protected]>

* Update src/catalog/src/local/manager.rs

Co-authored-by: Lei, HUANG <[email protected]>

* fix: resolve PR comments

---------

Co-authored-by: Lei, HUANG <[email protected]>
  • Loading branch information
MichaelScofield and v0y4g3r authored Jan 31, 2023
1 parent 89e4084 commit 8149932
Show file tree
Hide file tree
Showing 20 changed files with 312 additions and 220 deletions.
62 changes: 29 additions & 33 deletions src/catalog/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::any::Any;
use std::fmt::Debug;

use common_error::ext::{BoxedError, ErrorExt};
use common_error::prelude::{Snafu, StatusCode};
Expand All @@ -21,6 +22,8 @@ use datatypes::prelude::ConcreteDataType;
use datatypes::schema::RawSchema;
use snafu::{Backtrace, ErrorCompat};

use crate::DeregisterTableRequest;

#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum Error {
Expand Down Expand Up @@ -96,18 +99,15 @@ pub enum Error {
#[snafu(display("Table `{}` already exists", table))]
TableExists { table: String, backtrace: Backtrace },

#[snafu(display("Table `{}` not exist", table))]
TableNotExist { table: String, backtrace: Backtrace },

#[snafu(display("Schema {} already exists", schema))]
SchemaExists {
schema: String,
backtrace: Backtrace,
},

#[snafu(display("Failed to register table"))]
RegisterTable {
#[snafu(backtrace)]
source: BoxedError,
},

#[snafu(display("Operation {} not implemented yet", operation))]
Unimplemented {
operation: String,
Expand Down Expand Up @@ -142,6 +142,17 @@ pub enum Error {
source: table::error::Error,
},

#[snafu(display(
"Failed to deregister table, request: {:?}, source: {}",
request,
source
))]
DeregisterTable {
request: DeregisterTableRequest,
#[snafu(backtrace)]
source: table::error::Error,
},

#[snafu(display("Illegal catalog manager state: {}", msg))]
IllegalManagerState { backtrace: Backtrace, msg: String },

Expand All @@ -165,7 +176,10 @@ pub enum Error {
},

#[snafu(display("Failure during SchemaProvider operation, source: {}", source))]
SchemaProviderOperation { source: BoxedError },
SchemaProviderOperation {
#[snafu(backtrace)]
source: BoxedError,
},

#[snafu(display("Failed to execute system catalog table scan, source: {}", source))]
SystemCatalogTableScanExec {
Expand All @@ -178,15 +192,6 @@ pub enum Error {
source: common_catalog::error::Error,
},

#[snafu(display("IO error occurred while fetching catalog info, source: {}", source))]
Io {
backtrace: Backtrace,
source: std::io::Error,
},

#[snafu(display("Local and remote catalog data are inconsistent, msg: {}", msg))]
CatalogStateInconsistent { msg: String, backtrace: Backtrace },

#[snafu(display("Failed to perform metasrv operation, source: {}", source))]
MetaSrv {
#[snafu(backtrace)]
Expand All @@ -198,12 +203,6 @@ pub enum Error {
#[snafu(backtrace)]
source: datatypes::error::Error,
},

#[snafu(display("Catalog internal error: {}", source))]
Internal {
#[snafu(backtrace)]
source: BoxedError,
},
}

pub type Result<T> = std::result::Result<T, Error>;
Expand All @@ -216,37 +215,34 @@ impl ErrorExt for Error {
| Error::TableNotFound { .. }
| Error::IllegalManagerState { .. }
| Error::CatalogNotFound { .. }
| Error::InvalidEntryType { .. }
| Error::CatalogStateInconsistent { .. } => StatusCode::Unexpected,
| Error::InvalidEntryType { .. } => StatusCode::Unexpected,

Error::SystemCatalog { .. }
| Error::EmptyValue { .. }
| Error::ValueDeserialize { .. }
| Error::Io { .. } => StatusCode::StorageUnavailable,
| Error::ValueDeserialize { .. } => StatusCode::StorageUnavailable,

Error::RegisterTable { .. } | Error::SystemCatalogTypeMismatch { .. } => {
StatusCode::Internal
}
Error::SystemCatalogTypeMismatch { .. } => StatusCode::Internal,

Error::ReadSystemCatalog { source, .. } => source.status_code(),
Error::InvalidCatalogValue { source, .. } => source.status_code(),

Error::TableExists { .. } => StatusCode::TableAlreadyExists,
Error::TableNotExist { .. } => StatusCode::TableNotFound,
Error::SchemaExists { .. } => StatusCode::InvalidArguments,

Error::OpenSystemCatalog { source, .. }
| Error::CreateSystemCatalog { source, .. }
| Error::InsertCatalogRecord { source, .. }
| Error::OpenTable { source, .. }
| Error::CreateTable { source, .. } => source.status_code(),
| Error::CreateTable { source, .. }
| Error::DeregisterTable { source, .. } => source.status_code(),

Error::MetaSrv { source, .. } => source.status_code(),
Error::SystemCatalogTableScan { source } => source.status_code(),
Error::SystemCatalogTableScanExec { source } => source.status_code(),
Error::InvalidTableSchema { source, .. } => source.status_code(),
Error::InvalidTableInfoInCatalog { .. } => StatusCode::Unexpected,
Error::Internal { source, .. } | Error::SchemaProviderOperation { source } => {
source.status_code()
}
Error::SchemaProviderOperation { source } => source.status_code(),

Error::Unimplemented { .. } => StatusCode::Unsupported,
}
Expand Down
2 changes: 1 addition & 1 deletion src/catalog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ pub struct RenameTableRequest {
pub table_id: TableId,
}

#[derive(Clone)]
#[derive(Debug, Clone)]
pub struct DeregisterTableRequest {
pub catalog: String,
pub schema: String,
Expand Down
39 changes: 32 additions & 7 deletions src/catalog/src/local/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ use table::table::TableIdProvider;
use table::TableRef;

use crate::error::{
CatalogNotFoundSnafu, IllegalManagerStateSnafu, OpenTableSnafu, ReadSystemCatalogSnafu, Result,
SchemaExistsSnafu, SchemaNotFoundSnafu, SystemCatalogSnafu, SystemCatalogTypeMismatchSnafu,
TableExistsSnafu, TableNotFoundSnafu, UnimplementedSnafu,
self, CatalogNotFoundSnafu, IllegalManagerStateSnafu, OpenTableSnafu, ReadSystemCatalogSnafu,
Result, SchemaExistsSnafu, SchemaNotFoundSnafu, SystemCatalogSnafu,
SystemCatalogTypeMismatchSnafu, TableExistsSnafu, TableNotFoundSnafu,
};
use crate::local::memory::{MemoryCatalogManager, MemoryCatalogProvider, MemorySchemaProvider};
use crate::system::{
Expand Down Expand Up @@ -419,11 +419,36 @@ impl CatalogManager for LocalCatalogManager {
.is_ok())
}

async fn deregister_table(&self, _request: DeregisterTableRequest) -> Result<bool> {
UnimplementedSnafu {
operation: "deregister table",
async fn deregister_table(&self, request: DeregisterTableRequest) -> Result<bool> {
{
let started = *self.init_lock.lock().await;
ensure!(started, IllegalManagerStateSnafu { msg: "not started" });
}

{
let _ = self.register_lock.lock().await;

let DeregisterTableRequest {
catalog,
schema,
table_name,
} = &request;
let table_id = self
.catalogs
.table(catalog, schema, table_name)?
.with_context(|| error::TableNotExistSnafu {
table: format!("{catalog}.{schema}.{table_name}"),
})?
.table_info()
.ident
.table_id;

if !self.system.deregister_table(&request, table_id).await? {
return Ok(false);
}

self.catalogs.deregister_table(request).await
}
.fail()
}

async fn register_schema(&self, request: RegisterSchemaRequest) -> Result<bool> {
Expand Down
8 changes: 6 additions & 2 deletions src/catalog/src/local/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ use std::sync::{Arc, RwLock};

use common_catalog::consts::MIN_USER_TABLE_ID;
use common_telemetry::error;
use snafu::OptionExt;
use snafu::{ensure, OptionExt};
use table::metadata::TableId;
use table::table::TableIdProvider;
use table::TableRef;

use crate::error::{
CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu, TableNotFoundSnafu,
self, CatalogNotFoundSnafu, Result, SchemaNotFoundSnafu, TableExistsSnafu, TableNotFoundSnafu,
};
use crate::schema::SchemaProvider;
use crate::{
Expand Down Expand Up @@ -250,6 +250,10 @@ impl CatalogProvider for MemoryCatalogProvider {
schema: SchemaProviderRef,
) -> Result<Option<SchemaProviderRef>> {
let mut schemas = self.schemas.write().unwrap();
ensure!(
!schemas.contains_key(&name),
error::SchemaExistsSnafu { schema: &name }
);
Ok(schemas.insert(name, schema))
}

Expand Down
Loading

0 comments on commit 8149932

Please sign in to comment.