Skip to content

Commit

Permalink
refactor: unify the styling in create_or_alter_tables_on_demand (#4756
Browse files Browse the repository at this point in the history
)

* refactor: refactor `create_or_alter_tables_on_demand`

* chore: apply suggestions from CR
  • Loading branch information
WenyXu committed Sep 24, 2024
1 parent 54f6e13 commit d1b2527
Showing 1 changed file with 82 additions and 94 deletions.
176 changes: 82 additions & 94 deletions src/operator/src/insert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -505,28 +505,25 @@ impl Inserter {
let table_info = table.table_info();
table_name_to_ids.insert(table_info.name.clone(), table_info.table_id());
if let Some(alter_expr) =
self.get_alter_table_expr_on_demand(req, table, ctx)?
self.get_alter_table_expr_on_demand(req, &table, ctx)?
{
alter_tables.push(alter_expr);
}
}
None => {
create_tables.push(req);
let create_expr =
self.get_create_table_expr_on_demand(req, &auto_create_table_type, ctx)?;
create_tables.push(create_expr);
}
}
}

match auto_create_table_type {
AutoCreateTableType::Logical(on_physical_table) => {
AutoCreateTableType::Logical(_) => {
if !create_tables.is_empty() {
// Creates logical tables in batch.
let tables = self
.create_logical_tables(
create_tables,
ctx,
&on_physical_table,
statement_executor,
)
.create_logical_tables(create_tables, ctx, statement_executor)
.await?;

for table in tables {
Expand All @@ -544,14 +541,9 @@ impl Inserter {
AutoCreateTableType::Physical
| AutoCreateTableType::Log
| AutoCreateTableType::LastNonNull => {
for req in create_tables {
for create_table in create_tables {
let table = self
.create_non_logical_table(
req,
ctx,
statement_executor,
auto_create_table_type.clone(),
)
.create_physical_table(create_table, ctx, statement_executor)
.await?;
let table_info = table.table_info();
table_name_to_ids.insert(table_info.name.clone(), table_info.table_id());
Expand Down Expand Up @@ -605,7 +597,8 @@ impl Inserter {
options: None,
},
];
let create_table_expr = &mut build_create_table_expr(&table_reference, &default_schema)?;
let create_table_expr =
&mut build_create_table_expr(&table_reference, &default_schema, default_engine())?;

create_table_expr.engine = METRIC_ENGINE_NAME.to_string();
create_table_expr
Expand Down Expand Up @@ -641,10 +634,61 @@ impl Inserter {
.context(CatalogSnafu)
}

fn get_create_table_expr_on_demand(
&self,
req: &RowInsertRequest,
create_type: &AutoCreateTableType,
ctx: &QueryContextRef,
) -> Result<CreateTableExpr> {
let mut table_options = Vec::with_capacity(4);
if let Some(ttl) = ctx.extension(TTL_KEY) {
table_options.push((TTL_KEY, ttl));
}

let mut engine_name = default_engine();
match create_type {
AutoCreateTableType::Logical(physical_table) => {
engine_name = METRIC_ENGINE_NAME;
table_options.push((LOGICAL_TABLE_METADATA_KEY, physical_table));
}
AutoCreateTableType::Physical => {
if let Some(append_mode) = ctx.extension(APPEND_MODE_KEY) {
table_options.push((APPEND_MODE_KEY, append_mode));
}
if let Some(merge_mode) = ctx.extension(MERGE_MODE_KEY) {
table_options.push((MERGE_MODE_KEY, merge_mode));
}
}
// Set append_mode to true for log table.
// because log tables should keep rows with the same ts and tags.
AutoCreateTableType::Log => {
table_options.push((APPEND_MODE_KEY, "true"));
}
AutoCreateTableType::LastNonNull => {
table_options.push((MERGE_MODE_KEY, "last_non_null"));
}
}

let schema = ctx.current_schema();
let table_ref = TableReference::full(ctx.current_catalog(), &schema, &req.table_name);
// SAFETY: `req.rows` is guaranteed to be `Some` by `handle_row_inserts_with_create_type()`.
let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
let mut create_table_expr =
build_create_table_expr(&table_ref, request_schema, engine_name)?;
info!("Table `{table_ref}` does not exist, try creating table");
for (k, v) in table_options {
create_table_expr
.table_options
.insert(k.to_string(), v.to_string());
}

Ok(create_table_expr)
}

fn get_alter_table_expr_on_demand(
&self,
req: &RowInsertRequest,
table: TableRef,
table: &TableRef,
ctx: &QueryContextRef,
) -> Result<Option<AlterExpr>> {
let catalog_name = ctx.current_catalog();
Expand All @@ -667,76 +711,37 @@ impl Inserter {
}))
}

/// Creates a non-logical table by create type.
/// # Panics
/// Panics if `create_type` is `AutoCreateTableType::Logical`.
async fn create_non_logical_table(
&self,
req: &RowInsertRequest,
ctx: &QueryContextRef,
statement_executor: &StatementExecutor,
create_type: AutoCreateTableType,
) -> Result<TableRef> {
let mut hint_options = vec![];

if let Some(ttl) = ctx.extension(TTL_KEY) {
hint_options.push((TTL_KEY, ttl));
}

match create_type {
AutoCreateTableType::Logical(_) => unreachable!(),
AutoCreateTableType::Physical => {
if let Some(append_mode) = ctx.extension(APPEND_MODE_KEY) {
hint_options.push((APPEND_MODE_KEY, append_mode));
}
if let Some(merge_mode) = ctx.extension(MERGE_MODE_KEY) {
hint_options.push((MERGE_MODE_KEY, merge_mode));
}
}
// Set append_mode to true for log table.
// because log tables should keep rows with the same ts and tags.
AutoCreateTableType::Log => {
hint_options.push((APPEND_MODE_KEY, "true"));
}
AutoCreateTableType::LastNonNull => {
hint_options.push((MERGE_MODE_KEY, "last_non_null"));
}
}
let options: &[(&str, &str)] = hint_options.as_slice();

self.create_table_with_options(req, ctx, statement_executor, options)
.await
}

/// Creates a table with options.
async fn create_table_with_options(
async fn create_physical_table(
&self,
req: &RowInsertRequest,
mut create_table_expr: CreateTableExpr,
ctx: &QueryContextRef,
statement_executor: &StatementExecutor,
options: &[(&str, &str)],
) -> Result<TableRef> {
let schema = ctx.current_schema();
let table_ref = TableReference::full(ctx.current_catalog(), &schema, &req.table_name);
// SAFETY: `req.rows` is guaranteed to be `Some` by `handle_row_inserts_with_create_type()`.
let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
let create_table_expr = &mut build_create_table_expr(&table_ref, request_schema)?;
{
let table_ref = TableReference::full(
&create_table_expr.catalog_name,
&create_table_expr.schema_name,
&create_table_expr.table_name,
);

info!("Table `{table_ref}` does not exist, try creating table");
for (k, v) in options {
create_table_expr
.table_options
.insert(k.to_string(), v.to_string());
info!("Table `{table_ref}` does not exist, try creating table");
}
let res = statement_executor
.create_table_inner(create_table_expr, None, ctx.clone())
.create_table_inner(&mut create_table_expr, None, ctx.clone())
.await;

let table_ref = TableReference::full(
&create_table_expr.catalog_name,
&create_table_expr.schema_name,
&create_table_expr.table_name,
);

match res {
Ok(table) => {
info!(
"Successfully created table {} with options: {:?}",
table_ref, options
table_ref, create_table_expr.table_options,
);
Ok(table)
}
Expand All @@ -749,30 +754,12 @@ impl Inserter {

async fn create_logical_tables(
&self,
create_tables: Vec<&RowInsertRequest>,
create_table_exprs: Vec<CreateTableExpr>,
ctx: &QueryContextRef,
physical_table: &str,
statement_executor: &StatementExecutor,
) -> Result<Vec<TableRef>> {
let catalog_name = ctx.current_catalog();
let schema_name = ctx.current_schema();
let create_table_exprs = create_tables
.iter()
.map(|req| {
let table_ref = TableReference::full(catalog_name, &schema_name, &req.table_name);
let request_schema = req.rows.as_ref().unwrap().schema.as_slice();
let mut create_table_expr = build_create_table_expr(&table_ref, request_schema)?;

create_table_expr.engine = METRIC_ENGINE_NAME.to_string();
create_table_expr.table_options.insert(
LOGICAL_TABLE_METADATA_KEY.to_string(),
physical_table.to_string(),
);

Ok(create_table_expr)
})
.collect::<Result<Vec<_>>>()?;

let res = statement_executor
.create_logical_tables(catalog_name, &schema_name, &create_table_exprs, ctx.clone())
.await;
Expand Down Expand Up @@ -827,6 +814,7 @@ fn validate_column_count_match(requests: &RowInsertRequests) -> Result<()> {
fn build_create_table_expr(
table: &TableReference,
request_schema: &[ColumnSchema],
engine: &str,
) -> Result<CreateTableExpr> {
CreateExprFactory.create_table_expr_by_column_schemas(table, request_schema, default_engine())
CreateExprFactory.create_table_expr_by_column_schemas(table, request_schema, engine)
}

0 comments on commit d1b2527

Please sign in to comment.