Skip to content

Commit

Permalink
refactor!: Uses table id to locate tables in table engines (#1817)
Browse files Browse the repository at this point in the history
* refactor: add table_id to get_table()/table_exists()

* refactor: Add table_id to alter table request

* refactor: Add table id to DropTableRequest

* refactor: add table id to DropTableRequest

* refactor: Use table id as key for the tables map

* refactor: use table id as file engine's map key

* refactor: Remove table reference from engine's get_table/table_exists

* style: remove unused imports

* feat!: Add table id to TableRegionalValue

* style: fix cilppy

* chore: add comments and logs
  • Loading branch information
evenyag authored Jun 25, 2023
1 parent 223cf31 commit fd412b7
Show file tree
Hide file tree
Showing 27 changed files with 303 additions and 432 deletions.
3 changes: 3 additions & 0 deletions src/catalog/src/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,9 @@ impl TableRegionalKey {
/// region ids allocated by metasrv.
#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct TableRegionalValue {
// We can remove the `Option` from the table id once all regional values
// stored in meta have table ids.
pub table_id: Option<TableId>,
pub version: TableVersion,
pub regions_ids: Vec<u32>,
pub engine_name: Option<String>,
Expand Down
51 changes: 32 additions & 19 deletions src/catalog/src/remote/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -984,21 +984,29 @@ impl SchemaProvider for RemoteSchemaProvider {
.get(key.as_bytes())
.await?
.map(|Kv(_, v)| {
let TableRegionalValue { engine_name, .. } =
TableRegionalValue::parse(String::from_utf8_lossy(&v))
.context(InvalidCatalogValueSnafu)?;
let reference = TableReference {
catalog: &self.catalog_name,
schema: &self.schema_name,
table: name,
let TableRegionalValue {
table_id,
engine_name,
..
} = TableRegionalValue::parse(String::from_utf8_lossy(&v))
.context(InvalidCatalogValueSnafu)?;

let Some(table_id) = table_id else {
warn!("Cannot find table id for {key}, the value has an old format");
return Ok(None);
};
let engine_name = engine_name.as_deref().unwrap_or(MITO_ENGINE);
let engine = self
.engine_manager
.engine(engine_name)
.context(TableEngineNotFoundSnafu { engine_name })?;
let reference = TableReference {
catalog: &self.catalog_name,
schema: &self.schema_name,
table: name,
};
let table = engine
.get_table(&EngineContext {}, &reference)
.get_table(&EngineContext {}, table_id)
.with_context(|_| OpenTableSnafu {
table_info: reference.to_string(),
})?;
Expand All @@ -1011,9 +1019,12 @@ impl SchemaProvider for RemoteSchemaProvider {
}

async fn register_table(&self, name: String, table: TableRef) -> Result<Option<TableRef>> {
// Currently, initiate_tables() always call this method to register the table to the schema thus we
// always update the region value.
let table_info = table.table_info();
let table_version = table_info.ident.version;
let table_value = TableRegionalValue {
table_id: Some(table_info.ident.table_id),
version: table_version,
regions_ids: table.table_info().meta.region_numbers.clone(),
engine_name: Some(table_info.meta.engine.clone()),
Expand Down Expand Up @@ -1061,25 +1072,27 @@ impl SchemaProvider for RemoteSchemaProvider {
.get(table_key.as_bytes())
.await?
.map(|Kv(_, v)| {
let TableRegionalValue { engine_name, .. } =
TableRegionalValue::parse(String::from_utf8_lossy(&v))
.context(InvalidCatalogValueSnafu)?;
Ok(engine_name)
let TableRegionalValue {
table_id,
engine_name,
..
} = TableRegionalValue::parse(String::from_utf8_lossy(&v))
.context(InvalidCatalogValueSnafu)?;
Ok(engine_name.and_then(|name| table_id.map(|id| (name, id))))
})
.transpose()?
.flatten();

let engine_name = engine_opt.as_deref().unwrap_or_else(|| {
warn!("Cannot find table engine name for {table_key}");
MITO_ENGINE
});

self.backend.delete(table_key.as_bytes()).await?;
debug!(
"Successfully deleted catalog table entry, key: {}",
table_key
);

let Some((engine_name, table_id)) = engine_opt else {
warn!("Cannot find table id and engine name for {table_key}");
return Ok(None);
};
let reference = TableReference {
catalog: &self.catalog_name,
schema: &self.schema_name,
Expand All @@ -1088,9 +1101,9 @@ impl SchemaProvider for RemoteSchemaProvider {
// deregistering table does not necessarily mean dropping the table
let table = self
.engine_manager
.engine(engine_name)
.engine(&engine_name)
.context(TableEngineNotFoundSnafu { engine_name })?
.get_table(&EngineContext {}, &reference)
.get_table(&EngineContext {}, table_id)
.with_context(|_| OpenTableSnafu {
table_info: reference.to_string(),
})?;
Expand Down
60 changes: 15 additions & 45 deletions src/catalog/src/remote/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ use std::any::Any;
use std::collections::btree_map::Entry;
use std::collections::{BTreeMap, HashMap};
use std::fmt::{Display, Formatter};
use std::str::FromStr;
use std::sync::Arc;
use std::sync::{Arc, RwLock as StdRwLock};

use async_stream::stream;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
Expand All @@ -27,7 +26,7 @@ use datatypes::data_type::ConcreteDataType;
use datatypes::schema::{ColumnSchema, Schema};
use datatypes::vectors::StringVector;
use serde::Serializer;
use table::engine::{CloseTableResult, EngineContext, TableEngine, TableReference};
use table::engine::{CloseTableResult, EngineContext, TableEngine};
use table::metadata::TableId;
use table::requests::{
AlterTableRequest, CloseTableRequest, CreateTableRequest, DropTableRequest, OpenTableRequest,
Expand Down Expand Up @@ -167,7 +166,7 @@ impl KvBackend for MockKvBackend {

#[derive(Default)]
pub struct MockTableEngine {
tables: RwLock<HashMap<String, TableRef>>,
tables: StdRwLock<HashMap<TableId, TableRef>>,
}

#[async_trait::async_trait]
Expand All @@ -182,21 +181,8 @@ impl TableEngine for MockTableEngine {
_ctx: &EngineContext,
request: CreateTableRequest,
) -> table::Result<TableRef> {
let table_name = request.table_name.clone();
let catalog_name = request.catalog_name.clone();
let schema_name = request.schema_name.clone();
let table_full_name =
TableReference::full(&catalog_name, &schema_name, &table_name).to_string();
let table_id = request.id;

let default_table_id = "0".to_owned();
let table_id = TableId::from_str(
request
.table_options
.extra_options
.get("table_id")
.unwrap_or(&default_table_id),
)
.unwrap();
let schema = Arc::new(Schema::new(vec![ColumnSchema::new(
"name",
ConcreteDataType::string_datatype(),
Expand All @@ -206,16 +192,16 @@ impl TableEngine for MockTableEngine {
let data = vec![Arc::new(StringVector::from(vec!["a", "b", "c"])) as _];
let record_batch = RecordBatch::new(schema, data).unwrap();
let table: TableRef = Arc::new(MemTable::new_with_catalog(
&table_name,
&request.table_name,
record_batch,
table_id,
catalog_name,
schema_name,
request.catalog_name,
request.schema_name,
vec![0],
)) as Arc<_>;

let mut tables = self.tables.write().await;
tables.insert(table_full_name, table.clone() as TableRef);
let mut tables = self.tables.write().unwrap();
tables.insert(table_id, table.clone() as TableRef);
Ok(table)
}

Expand All @@ -224,7 +210,7 @@ impl TableEngine for MockTableEngine {
_ctx: &EngineContext,
request: OpenTableRequest,
) -> table::Result<Option<TableRef>> {
Ok(self.tables.read().await.get(&request.table_name).cloned())
Ok(self.tables.read().unwrap().get(&request.table_id).cloned())
}

async fn alter_table(
Expand All @@ -238,25 +224,13 @@ impl TableEngine for MockTableEngine {
fn get_table(
&self,
_ctx: &EngineContext,
table_ref: &TableReference,
table_id: TableId,
) -> table::Result<Option<TableRef>> {
futures::executor::block_on(async {
Ok(self
.tables
.read()
.await
.get(&table_ref.to_string())
.cloned())
})
Ok(self.tables.read().unwrap().get(&table_id).cloned())
}

fn table_exists(&self, _ctx: &EngineContext, table_ref: &TableReference) -> bool {
futures::executor::block_on(async {
self.tables
.read()
.await
.contains_key(&table_ref.to_string())
})
fn table_exists(&self, _ctx: &EngineContext, table_id: TableId) -> bool {
self.tables.read().unwrap().contains_key(&table_id)
}

async fn drop_table(
Expand All @@ -272,11 +246,7 @@ impl TableEngine for MockTableEngine {
_ctx: &EngineContext,
request: CloseTableRequest,
) -> table::Result<CloseTableResult> {
let _ = self
.tables
.write()
.await
.remove(&request.table_ref().to_string());
let _ = self.tables.write().unwrap().remove(&request.table_id);
Ok(CloseTableResult::Released(vec![]))
}

Expand Down
13 changes: 7 additions & 6 deletions src/catalog/src/remote/region_alive_keeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -475,6 +475,7 @@ impl CountdownTask {
catalog_name: table_ident.catalog.clone(),
schema_name: table_ident.schema.clone(),
table_name: table_ident.table.clone(),
table_id: table_ident.table_id,
region_numbers: vec![region],
flush: true,
};
Expand All @@ -499,7 +500,7 @@ mod test {
use common_meta::heartbeat::mailbox::HeartbeatMailbox;
use datatypes::schema::RawSchema;
use table::engine::manager::MemoryTableEngineManager;
use table::engine::{TableEngine, TableReference};
use table::engine::TableEngine;
use table::requests::{CreateTableRequest, TableOptions};
use table::test_util::EmptyTable;

Expand Down Expand Up @@ -751,8 +752,9 @@ mod test {
let catalog = "my_catalog";
let schema = "my_schema";
let table = "my_table";
let table_id = 1;
let request = CreateTableRequest {
id: 1,
id: table_id,
catalog_name: catalog.to_string(),
schema_name: schema.to_string(),
table_name: table.to_string(),
Expand All @@ -768,7 +770,6 @@ mod test {
table_options: TableOptions::default(),
engine: "mito".to_string(),
};
let table_ref = TableReference::full(catalog, schema, table);

let table_engine = Arc::new(MockTableEngine::default());
table_engine.create_table(ctx, request).await.unwrap();
Expand All @@ -777,7 +778,7 @@ mod test {
catalog: catalog.to_string(),
schema: schema.to_string(),
table: table.to_string(),
table_id: 1024,
table_id,
engine: "mito".to_string(),
};
let (tx, rx) = mpsc::channel(10);
Expand Down Expand Up @@ -813,9 +814,9 @@ mod test {
.unwrap();

// assert the table is closed after deadline is reached
assert!(table_engine.table_exists(ctx, &table_ref));
assert!(table_engine.table_exists(ctx, table_id));
// spare 500ms for the task to close the table
tokio::time::sleep(Duration::from_millis(2000)).await;
assert!(!table_engine.table_exists(ctx, &table_ref));
assert!(!table_engine.table_exists(ctx, table_id));
}
}
11 changes: 7 additions & 4 deletions src/common/grpc-expr/src/alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ const LOCATION_TYPE_FIRST: i32 = LocationType::First as i32;
const LOCATION_TYPE_AFTER: i32 = LocationType::After as i32;

/// Convert an [`AlterExpr`] to an [`AlterTableRequest`]
pub fn alter_expr_to_request(expr: AlterExpr) -> Result<AlterTableRequest> {
pub fn alter_expr_to_request(table_id: TableId, expr: AlterExpr) -> Result<AlterTableRequest> {
let catalog_name = expr.catalog_name;
let schema_name = expr.schema_name;
let kind = expr.kind.context(MissingFieldSnafu { field: "kind" })?;
Expand Down Expand Up @@ -69,6 +69,7 @@ pub fn alter_expr_to_request(expr: AlterExpr) -> Result<AlterTableRequest> {
catalog_name,
schema_name,
table_name: expr.table_name,
table_id,
alter_kind,
};
Ok(request)
Expand All @@ -82,6 +83,7 @@ pub fn alter_expr_to_request(expr: AlterExpr) -> Result<AlterTableRequest> {
catalog_name,
schema_name,
table_name: expr.table_name,
table_id,
alter_kind,
};
Ok(request)
Expand All @@ -92,6 +94,7 @@ pub fn alter_expr_to_request(expr: AlterExpr) -> Result<AlterTableRequest> {
catalog_name,
schema_name,
table_name: expr.table_name,
table_id,
alter_kind,
};
Ok(request)
Expand Down Expand Up @@ -239,7 +242,7 @@ mod tests {
})),
};

let alter_request = alter_expr_to_request(expr).unwrap();
let alter_request = alter_expr_to_request(1, expr).unwrap();
assert_eq!(alter_request.catalog_name, "");
assert_eq!(alter_request.schema_name, "");
assert_eq!("monitor".to_string(), alter_request.table_name);
Expand Down Expand Up @@ -296,7 +299,7 @@ mod tests {
})),
};

let alter_request = alter_expr_to_request(expr).unwrap();
let alter_request = alter_expr_to_request(1, expr).unwrap();
assert_eq!(alter_request.catalog_name, "");
assert_eq!(alter_request.schema_name, "");
assert_eq!("monitor".to_string(), alter_request.table_name);
Expand Down Expand Up @@ -344,7 +347,7 @@ mod tests {
})),
};

let alter_request = alter_expr_to_request(expr).unwrap();
let alter_request = alter_expr_to_request(1, expr).unwrap();
assert_eq!(alter_request.catalog_name, "test_catalog");
assert_eq!(alter_request.schema_name, "test_schema");
assert_eq!("monitor".to_string(), alter_request.table_name);
Expand Down
3 changes: 2 additions & 1 deletion src/datanode/src/heartbeat/handler/close_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ impl CloseRegionHandler {
}

if engine
.get_table(&ctx, table_ref)
.get_table(&ctx, region_ident.table_ident.table_id)
.with_context(|_| error::GetTableSnafu {
table_name: table_ref.to_string(),
})?
Expand All @@ -178,6 +178,7 @@ impl CloseRegionHandler {
schema_name: table_ref.schema.to_string(),
table_name: table_ref.table.to_string(),
region_numbers: region_numbers.clone(),
table_id: region_ident.table_ident.table_id,
flush: true,
},
)
Expand Down
11 changes: 10 additions & 1 deletion src/datanode/src/instance/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,18 +106,27 @@ impl Instance {
let name = alter_table.table_name().clone();
let (catalog, schema, table) = table_idents_to_full_name(&name, query_ctx.clone())?;
let table_ref = TableReference::full(&catalog, &schema, &table);
let req = SqlHandler::alter_to_request(alter_table, table_ref)?;
// Currently, we have to get the table multiple times. Consider remove the sql handler in the future.
let table = self.sql_handler.get_table(&table_ref).await?;
let req = SqlHandler::alter_to_request(
alter_table,
table_ref,
table.table_info().ident.table_id,
)?;
self.sql_handler
.execute(SqlRequest::Alter(req), query_ctx)
.await
}
Statement::DropTable(drop_table) => {
let (catalog_name, schema_name, table_name) =
table_idents_to_full_name(drop_table.table_name(), query_ctx.clone())?;
let table_ref = TableReference::full(&catalog_name, &schema_name, &table_name);
let table = self.sql_handler.get_table(&table_ref).await?;
let req = DropTableRequest {
catalog_name,
schema_name,
table_name,
table_id: table.table_info().ident.table_id,
};
self.sql_handler
.execute(SqlRequest::DropTable(req), query_ctx)
Expand Down
Loading

0 comments on commit fd412b7

Please sign in to comment.