Skip to content

Commit

Permalink
fix/database-base-ttl:
Browse files Browse the repository at this point in the history
 Refactor error handling and remove schema dependency in table creation

 - Replace expect with the ? operator for error handling in open_compaction_region
 - Simplify create_logical_tables by removing catalog and schema name parameters
 - Remove unnecessary schema retrieval and merging of schema options in create_table_info
 - Clean up unused imports and redundant code
  • Loading branch information
v0y4g3r committed Nov 3, 2024
1 parent 5892f2a commit ca78c1d
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 52 deletions.
3 changes: 1 addition & 2 deletions src/mito2/src/compaction/compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,7 @@ pub async fn open_compaction_region(
current_version.options.ttl,
&schema_metadata_manager,
)
.await
.expect("===");
.await?;
Ok(CompactionRegion {
region_id: req.region_id,
region_options: req.region_options.clone(),
Expand Down
4 changes: 1 addition & 3 deletions src/operator/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -758,10 +758,8 @@ impl Inserter {
ctx: &QueryContextRef,
statement_executor: &StatementExecutor,
) -> Result<Vec<TableRef>> {
let catalog_name = ctx.current_catalog();
let schema_name = ctx.current_schema();
let res = statement_executor
.create_logical_tables(catalog_name, &schema_name, &create_table_exprs, ctx.clone())
.create_logical_tables(&create_table_exprs, ctx.clone())
.await;

match res {
Expand Down
51 changes: 4 additions & 47 deletions src/operator/src/statement/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use common_error::ext::BoxedError;
use common_meta::cache_invalidator::Context;
use common_meta::ddl::ExecutorContext;
use common_meta::instruction::CacheIdent;
use common_meta::key::schema_name::{SchemaNameKey, SchemaNameValue};
use common_meta::key::schema_name::SchemaNameKey;
use common_meta::key::NAME_PATTERN;
use common_meta::rpc::ddl::{
CreateFlowTask, DdlTask, DropFlowTask, DropViewTask, SubmitDdlTaskRequest,
Expand Down Expand Up @@ -177,15 +177,8 @@ impl StatementExecutor {
.table_options
.contains_key(LOGICAL_TABLE_METADATA_KEY)
{
let catalog_name = &create_table.catalog_name;
let schema_name = &create_table.schema_name;
return self
.create_logical_tables(
catalog_name,
schema_name,
&[create_table.clone()],
query_ctx,
)
.create_logical_tables(&[create_table.clone()], query_ctx)
.await?
.into_iter()
.next()
Expand All @@ -195,22 +188,6 @@ impl StatementExecutor {
}

let _timer = crate::metrics::DIST_CREATE_TABLE.start_timer();
let schema = self
.table_metadata_manager
.schema_manager()
.get(SchemaNameKey::new(
&create_table.catalog_name,
&create_table.schema_name,
))
.await
.context(TableMetadataManagerSnafu)?;

let Some(schema_opts) = schema else {
return SchemaNotFoundSnafu {
schema_info: &create_table.schema_name,
}
.fail();
};

// if table exists.
if let Some(table) = self
Expand Down Expand Up @@ -252,7 +229,7 @@ impl StatementExecutor {
);

let (partitions, partition_cols) = parse_partitions(create_table, partitions, &query_ctx)?;
let mut table_info = create_table_info(create_table, partition_cols, schema_opts)?;
let mut table_info = create_table_info(create_table, partition_cols)?;

let resp = self
.create_table_procedure(
Expand Down Expand Up @@ -285,8 +262,6 @@ impl StatementExecutor {
#[tracing::instrument(skip_all)]
pub async fn create_logical_tables(
&self,
catalog_name: &str,
schema_name: &str,
create_table_exprs: &[CreateTableExpr],
query_context: QueryContextRef,
) -> Result<Vec<TableRef>> {
Expand All @@ -308,19 +283,9 @@ impl StatementExecutor {
);
}

let schema = self
.table_metadata_manager
.schema_manager()
.get(SchemaNameKey::new(catalog_name, schema_name))
.await
.context(TableMetadataManagerSnafu)?
.context(SchemaNotFoundSnafu {
schema_info: schema_name,
})?;

let mut raw_tables_info = create_table_exprs
.iter()
.map(|create| create_table_info(create, vec![], schema.clone()))
.map(|create| create_table_info(create, vec![]))
.collect::<Result<Vec<_>>>()?;
let tables_data = create_table_exprs
.iter()
Expand Down Expand Up @@ -1273,7 +1238,6 @@ fn parse_partitions(
fn create_table_info(
create_table: &CreateTableExpr,
partition_columns: Vec<String>,
schema_opts: SchemaNameValue,
) -> Result<RawTableInfo> {
let mut column_schemas = Vec::with_capacity(create_table.column_defs.len());
let mut column_name_to_index_map = HashMap::new();
Expand Down Expand Up @@ -1322,7 +1286,6 @@ fn create_table_info(

let table_options = TableOptions::try_from_iter(&create_table.table_options)
.context(UnrecognizedTableOptionSnafu)?;
let table_options = merge_options(table_options, schema_opts);

let meta = RawTableMeta {
schema: raw_schema,
Expand Down Expand Up @@ -1507,12 +1470,6 @@ fn convert_value(
.context(ParseSqlValueSnafu)
}

/// Merge table level table options with schema level table options.
fn merge_options(mut table_opts: TableOptions, schema_opts: SchemaNameValue) -> TableOptions {
table_opts.ttl = table_opts.ttl.or(schema_opts.ttl);
table_opts
}

#[cfg(test)]
mod test {
use session::context::{QueryContext, QueryContextBuilder};
Expand Down

0 comments on commit ca78c1d

Please sign in to comment.