Skip to content

Commit

Permalink
fix: table engine name
Browse files Browse the repository at this point in the history
  • Loading branch information
v0y4g3r committed Apr 25, 2023
1 parent 7ae02c0 commit 26abb32
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 43 deletions.
1 change: 1 addition & 0 deletions src/catalog/src/helper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ impl TableRegionalKey {
pub struct TableRegionalValue {
pub version: TableVersion,
pub regions_ids: Vec<u32>,
pub engine_name: Option<String>,
}

pub struct CatalogKey {
Expand Down
66 changes: 32 additions & 34 deletions src/catalog/src/remote/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -760,40 +760,37 @@ impl SchemaProvider for RemoteSchemaProvider {

async fn table(&self, name: &str) -> Result<Option<TableRef>> {
let key = self.build_regional_table_key(name).to_string();
info!("Try get table {}", name);
let table_opt =
self.backend
.get(key.as_bytes())
.await?
.map(|Kv(_, v)| {
// TODO(hl): we should put engine info in table regional value
// so that here we can use correct engine name
let TableRegionalValue { .. } =
TableRegionalValue::parse(String::from_utf8_lossy(&v))
.context(InvalidCatalogValueSnafu)?;

let reference = TableReference {
catalog: &self.catalog_name,
schema: &self.schema_name,
table: name,
};

let engine = self.engine_manager.engine(MITO_ENGINE).context(
TableEngineNotFoundSnafu {
engine_name: MITO_ENGINE,
},
)?;

let table = engine
.get_table(&EngineContext {}, &reference)
.with_context(|_| OpenTableSnafu {
table_info: reference.to_string(),
})?;

Ok(table)
})
.transpose()?
.flatten();
let table_opt = self
.backend
.get(key.as_bytes())
.await?
.map(|Kv(_, v)| {
let TableRegionalValue { engine_name, .. } =
TableRegionalValue::parse(String::from_utf8_lossy(&v))
.context(InvalidCatalogValueSnafu)?;

let reference = TableReference {
catalog: &self.catalog_name,
schema: &self.schema_name,
table: name,
};

let engine_name = engine_name.as_deref().unwrap_or(MITO_ENGINE);
let engine = self
.engine_manager
.engine(engine_name)
.context(TableEngineNotFoundSnafu { engine_name })?;

let table = engine
.get_table(&EngineContext {}, &reference)
.with_context(|_| OpenTableSnafu {
table_info: reference.to_string(),
})?;

Ok(table)
})
.transpose()?
.flatten();

Ok(table_opt)
}
Expand All @@ -804,6 +801,7 @@ impl SchemaProvider for RemoteSchemaProvider {
let table_value = TableRegionalValue {
version: table_version,
regions_ids: table.table_info().meta.region_numbers.clone(),
engine_name: Some(table_info.meta.engine.clone()),
};
let table_key = self.build_regional_table_key(&name).to_string();
self.backend
Expand Down
2 changes: 1 addition & 1 deletion src/common/substrait/src/df_substrait.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::sync::Arc;

use async_trait::async_trait;
use bytes::{Buf, Bytes, BytesMut};
use catalog::CatalogManagerRef;
use datafusion::catalog::catalog::CatalogList;
use datafusion::prelude::SessionContext;
use datafusion_expr::LogicalPlan;
use datafusion_substrait::logical_plan::consumer::from_substrait_plan;
Expand Down
2 changes: 2 additions & 0 deletions src/datanode/src/instance/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use api::v1::{CreateDatabaseExpr, DdlRequest, DeleteRequest, InsertRequest};
use async_trait::async_trait;
use catalog::CatalogManagerRef;
use common_query::Output;
use common_telemetry::info;
use datafusion::catalog::catalog::{
CatalogList, CatalogProvider, MemoryCatalogList, MemoryCatalogProvider,
};
Expand Down Expand Up @@ -252,6 +253,7 @@ impl SchemaProvider for DummySchemaProvider {
}

async fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
info!("DummySchemaProvider table: {}", name);
self.catalog_manager
.table(&self.catalog, &self.schema, name)
.await
Expand Down
3 changes: 2 additions & 1 deletion src/table-procedure/src/alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,14 +137,15 @@ impl AlterTableProcedure {
let request = &self.data.request;
let catalog = self
.catalog_manager
.catalog(&request.catalog_name)
.catalog_async(&request.catalog_name)
.await
.context(AccessCatalogSnafu)?
.context(CatalogNotFoundSnafu {
name: &request.catalog_name,
})?;
let schema = catalog
.schema(&request.schema_name)
.await
.context(AccessCatalogSnafu)?
.context(SchemaNotFoundSnafu {
name: &request.schema_name,
Expand Down
6 changes: 1 addition & 5 deletions tests/cases/distributed/alter/rename_table.result
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,9 @@ ALTER TABLE t RENAME new_table;

Affected Rows: 0

DESC TABLE t;

Error: 4001(TableNotFound), Table `t` not exist

DROP TABLE t;

Error: 4001(TableNotFound), Table `greptime.public.t` not exist
Error: 4001(TableNotFound), Table not found: greptime.public.t

DROP TABLE new_table;

Expand Down
2 changes: 0 additions & 2 deletions tests/cases/distributed/alter/rename_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ SELECT * from t;
-- TODO(LFC): Port test cases from standalone env when distribute rename table is implemented (#723).
ALTER TABLE t RENAME new_table;

DESC TABLE t;

DROP TABLE t;

DROP TABLE new_table;

0 comments on commit 26abb32

Please sign in to comment.