Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(query): Support generate virtual columns #11590

Merged
merged 36 commits into from
Jun 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
0e50466
feat(query): Support generate virtual columns
b41sh May 18, 2023
31c9d45
Merge branch 'main' into virtual-columns_b41sh
b41sh May 26, 2023
d850291
fix
b41sh May 26, 2023
955d26d
fix
b41sh May 29, 2023
b489b09
fix taplo fmt
b41sh May 29, 2023
75e4207
Merge branch 'main' into virtual-columns_b41sh
b41sh May 29, 2023
664dc32
add tests
b41sh May 29, 2023
3b7c285
fix
b41sh May 29, 2023
e0bd75b
fix
b41sh May 29, 2023
1e252fe
add test
b41sh May 29, 2023
1e7af96
Merge branch 'main' into virtual-columns_b41sh
b41sh May 29, 2023
409a246
fix
b41sh May 29, 2023
381e301
Merge branch 'main' into virtual-columns_b41sh
b41sh May 29, 2023
8af975c
add tests
b41sh May 29, 2023
d11c988
add alter virtual column
b41sh May 30, 2023
f1c9684
fix tests
b41sh May 30, 2023
010e983
add virtual block folder
b41sh May 30, 2023
5f426e2
fix
b41sh May 30, 2023
eb54671
fix
b41sh May 30, 2023
dce728c
fix meta api
b41sh May 31, 2023
9a8a815
merge
b41sh May 31, 2023
1b93c22
Merge branch 'main' into virtual-columns_b41sh
b41sh May 31, 2023
a58c96e
fix
b41sh May 31, 2023
07f0aea
Merge branch 'main' into virtual-columns_b41sh
b41sh May 31, 2023
1cad510
fix
b41sh May 31, 2023
b8165fa
Merge branch 'main' into virtual-columns_b41sh
b41sh May 31, 2023
582d663
Merge branch 'main' into virtual-columns_b41sh
b41sh May 31, 2023
ccbe15e
fix
b41sh Jun 1, 2023
f406f5b
Merge branch 'main' into virtual-columns_b41sh
b41sh Jun 1, 2023
19f57da
Merge branch 'main' into virtual-columns_b41sh
mergify[bot] Jun 1, 2023
89c6d15
Merge branch 'main' into virtual-columns_b41sh
mergify[bot] Jun 1, 2023
f408c74
Merge branch 'main' into virtual-columns_b41sh
mergify[bot] Jun 1, 2023
24081b1
Merge branch 'main' into virtual-columns_b41sh
mergify[bot] Jun 1, 2023
25d262d
Merge branch 'main' into virtual-columns_b41sh
mergify[bot] Jun 1, 2023
3c1783a
Merge branch 'main' into virtual-columns_b41sh
mergify[bot] Jun 1, 2023
acd35e0
Merge branch 'main' into virtual-columns_b41sh
mergify[bot] Jun 1, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/common/exception/src/exception_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,8 @@ build_exceptions! {
LicenceDenied(1112),
UnknownDatamask(1113),
UnmatchColumnDataType(1114),
VirtualColumnNotFound(1115),
VirtualColumnAlreadyExists(1116),

// Data Related Errors

Expand Down
30 changes: 30 additions & 0 deletions src/meta/api/src/schema_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ use common_meta_app::schema::CreateTableLockRevReply;
use common_meta_app::schema::CreateTableLockRevReq;
use common_meta_app::schema::CreateTableReply;
use common_meta_app::schema::CreateTableReq;
use common_meta_app::schema::CreateVirtualColumnReply;
use common_meta_app::schema::CreateVirtualColumnReq;
use common_meta_app::schema::DatabaseInfo;
use common_meta_app::schema::DeleteTableLockRevReq;
use common_meta_app::schema::DropDatabaseReply;
Expand All @@ -32,6 +34,8 @@ use common_meta_app::schema::DropIndexReply;
use common_meta_app::schema::DropIndexReq;
use common_meta_app::schema::DropTableByIdReq;
use common_meta_app::schema::DropTableReply;
use common_meta_app::schema::DropVirtualColumnReply;
use common_meta_app::schema::DropVirtualColumnReq;
use common_meta_app::schema::ExtendTableLockRevReq;
use common_meta_app::schema::GetDatabaseReq;
use common_meta_app::schema::GetTableCopiedFileReply;
Expand All @@ -42,6 +46,7 @@ use common_meta_app::schema::ListDatabaseReq;
use common_meta_app::schema::ListIndexesReq;
use common_meta_app::schema::ListTableLockRevReq;
use common_meta_app::schema::ListTableReq;
use common_meta_app::schema::ListVirtualColumnsReq;
use common_meta_app::schema::RenameDatabaseReply;
use common_meta_app::schema::RenameDatabaseReq;
use common_meta_app::schema::RenameTableReply;
Expand All @@ -58,8 +63,11 @@ use common_meta_app::schema::UndropTableReply;
use common_meta_app::schema::UndropTableReq;
use common_meta_app::schema::UpdateTableMetaReply;
use common_meta_app::schema::UpdateTableMetaReq;
use common_meta_app::schema::UpdateVirtualColumnReply;
use common_meta_app::schema::UpdateVirtualColumnReq;
use common_meta_app::schema::UpsertTableOptionReply;
use common_meta_app::schema::UpsertTableOptionReq;
use common_meta_app::schema::VirtualColumnMeta;
use common_meta_types::GCDroppedDataReply;
use common_meta_types::GCDroppedDataReq;
use common_meta_types::MetaId;
Expand Down Expand Up @@ -111,6 +119,28 @@ pub trait SchemaApi: Send + Sync {
req: ListIndexesReq,
) -> Result<Vec<(u64, String, IndexMeta)>, KVAppError>;

// virtual column

async fn create_virtual_column(
&self,
req: CreateVirtualColumnReq,
) -> Result<CreateVirtualColumnReply, KVAppError>;

async fn update_virtual_column(
&self,
req: UpdateVirtualColumnReq,
) -> Result<UpdateVirtualColumnReply, KVAppError>;

async fn drop_virtual_column(
&self,
req: DropVirtualColumnReq,
) -> Result<DropVirtualColumnReply, KVAppError>;

async fn list_virtual_columns(
&self,
req: ListVirtualColumnsReq,
) -> Result<Vec<VirtualColumnMeta>, KVAppError>;

// table

async fn create_table(&self, req: CreateTableReq) -> Result<CreateTableReply, KVAppError>;
Expand Down
206 changes: 206 additions & 0 deletions src/meta/api/src/schema_api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use common_meta_app::app_error::UnknownDatabaseId;
use common_meta_app::app_error::UnknownIndex;
use common_meta_app::app_error::UnknownTable;
use common_meta_app::app_error::UnknownTableId;
use common_meta_app::app_error::VirtualColumnAlreadyExists;
use common_meta_app::app_error::WrongShare;
use common_meta_app::app_error::WrongShareObject;
use common_meta_app::schema::CountTablesKey;
Expand All @@ -54,6 +55,8 @@ use common_meta_app::schema::CreateTableLockRevReply;
use common_meta_app::schema::CreateTableLockRevReq;
use common_meta_app::schema::CreateTableReply;
use common_meta_app::schema::CreateTableReq;
use common_meta_app::schema::CreateVirtualColumnReply;
use common_meta_app::schema::CreateVirtualColumnReq;
use common_meta_app::schema::DBIdTableName;
use common_meta_app::schema::DatabaseId;
use common_meta_app::schema::DatabaseIdToName;
Expand All @@ -71,6 +74,8 @@ use common_meta_app::schema::DropIndexReply;
use common_meta_app::schema::DropIndexReq;
use common_meta_app::schema::DropTableByIdReq;
use common_meta_app::schema::DropTableReply;
use common_meta_app::schema::DropVirtualColumnReply;
use common_meta_app::schema::DropVirtualColumnReq;
use common_meta_app::schema::EmptyProto;
use common_meta_app::schema::ExtendTableLockRevReq;
use common_meta_app::schema::GetDatabaseReq;
Expand All @@ -85,6 +90,7 @@ use common_meta_app::schema::ListDatabaseReq;
use common_meta_app::schema::ListIndexesReq;
use common_meta_app::schema::ListTableLockRevReq;
use common_meta_app::schema::ListTableReq;
use common_meta_app::schema::ListVirtualColumnsReq;
use common_meta_app::schema::RenameDatabaseReply;
use common_meta_app::schema::RenameDatabaseReq;
use common_meta_app::schema::RenameTableReply;
Expand All @@ -108,9 +114,13 @@ use common_meta_app::schema::UndropTableReply;
use common_meta_app::schema::UndropTableReq;
use common_meta_app::schema::UpdateTableMetaReply;
use common_meta_app::schema::UpdateTableMetaReq;
use common_meta_app::schema::UpdateVirtualColumnReply;
use common_meta_app::schema::UpdateVirtualColumnReq;
use common_meta_app::schema::UpsertTableCopiedFileReq;
use common_meta_app::schema::UpsertTableOptionReply;
use common_meta_app::schema::UpsertTableOptionReq;
use common_meta_app::schema::VirtualColumnMeta;
use common_meta_app::schema::VirtualColumnNameIdent;
use common_meta_app::share::ShareGrantObject;
use common_meta_app::share::ShareId;
use common_meta_app::share::ShareNameIdent;
Expand Down Expand Up @@ -163,6 +173,7 @@ use crate::util::deserialize_u64;
use crate::util::get_index_metas_by_ids;
use crate::util::get_table_by_id_or_err;
use crate::util::get_table_names_by_ids;
use crate::util::get_virtual_column_by_id_or_err;
use crate::util::list_tables_from_share_db;
use crate::util::list_tables_from_unshare_db;
use crate::util::mget_pb_values;
Expand Down Expand Up @@ -1071,6 +1082,201 @@ impl<KV: kvapi::KVApi<Error = MetaError>> SchemaApi for KV {
Ok(index_metas)
}

// virtual column

async fn create_virtual_column(
&self,
req: CreateVirtualColumnReq,
) -> Result<CreateVirtualColumnReply, KVAppError> {
debug!(req = debug(&req), "SchemaApi: {}", func_name!());

let ctx = &func_name!();
let mut trials = txn_trials(None, ctx);
loop {
trials.next().unwrap()?;

let (_, old_virtual_column_opt): (_, Option<VirtualColumnMeta>) =
get_pb_value(self, &req.name_ident).await?;

if old_virtual_column_opt.is_some() {
return Err(KVAppError::AppError(AppError::VirtualColumnAlreadyExists(
VirtualColumnAlreadyExists::new(
req.name_ident.table_id,
format!(
"create virtual column with tenant: {} table_id: {}",
req.name_ident.tenant, req.name_ident.table_id
),
),
)));
}
let virtual_column_meta = VirtualColumnMeta {
table_id: req.name_ident.table_id,
virtual_columns: req.virtual_columns.clone(),
created_on: Utc::now(),
updated_on: None,
};

// Create virtual column by inserting this record:
// (tenant, table_id) -> virtual_column_meta
{
let condition = vec![txn_cond_seq(&req.name_ident, Eq, 0)];
let if_then = vec![txn_op_put(
&req.name_ident,
serialize_struct(&virtual_column_meta)?,
)];

let txn_req = TxnRequest {
condition,
if_then,
else_then: vec![],
};

let (succ, _responses) = send_txn(self, txn_req).await?;

debug!(
req.name_ident = debug(&virtual_column_meta),
succ = display(succ),
"create_virtual_column"
);

if succ {
break;
}
}
}

Ok(CreateVirtualColumnReply {})
}

async fn update_virtual_column(
&self,
req: UpdateVirtualColumnReq,
) -> Result<UpdateVirtualColumnReply, KVAppError> {
debug!(req = debug(&req), "SchemaApi: {}", func_name!());

let ctx = &func_name!();
let mut trials = txn_trials(None, ctx);
loop {
trials.next().unwrap()?;

let (seq, old_virtual_column_meta) =
get_virtual_column_by_id_or_err(self, &req.name_ident, ctx).await?;

let virtual_column_meta = VirtualColumnMeta {
table_id: req.name_ident.table_id,
virtual_columns: req.virtual_columns.clone(),
created_on: old_virtual_column_meta.created_on,
updated_on: Some(Utc::now()),
};

// Update virtual column by inserting this record:
// (tenant, table_id) -> virtual_column_meta
{
let condition = vec![txn_cond_seq(&req.name_ident, Eq, seq)];
let if_then = vec![txn_op_put(
&req.name_ident,
serialize_struct(&virtual_column_meta)?,
)];

let txn_req = TxnRequest {
condition,
if_then,
else_then: vec![],
};

let (succ, _responses) = send_txn(self, txn_req).await?;

debug!(
req.name_ident = debug(&virtual_column_meta),
succ = display(succ),
"update_virtual_column"
);

if succ {
break;
}
}
}

Ok(UpdateVirtualColumnReply {})
}

async fn drop_virtual_column(
&self,
req: DropVirtualColumnReq,
) -> Result<DropVirtualColumnReply, KVAppError> {
debug!(req = debug(&req), "SchemaApi: {}", func_name!());

let ctx = &func_name!();
let mut trials = txn_trials(None, ctx);
loop {
trials.next().unwrap()?;

let (_, _) = get_virtual_column_by_id_or_err(self, &req.name_ident, ctx).await?;

// Drop virtual column by deleting this record:
// (tenant, table_id) -> virtual_column_meta
{
let if_then = vec![txn_op_del(&req.name_ident)];
let txn_req = TxnRequest {
condition: vec![],
if_then,
else_then: vec![],
};

let (succ, _responses) = send_txn(self, txn_req).await?;

debug!(
req.name_ident = debug(&req.name_ident),
succ = display(succ),
"drop_virtual_column"
);

if succ {
break;
}
}
}

Ok(DropVirtualColumnReply {})
}

async fn list_virtual_columns(
&self,
req: ListVirtualColumnsReq,
) -> Result<Vec<VirtualColumnMeta>, KVAppError> {
debug!(req = debug(&req), "SchemaApi: {}", func_name!());

if let Some(table_id) = req.table_id {
let name_ident = VirtualColumnNameIdent {
tenant: req.tenant.clone(),
table_id,
};
let (_, virtual_column_opt): (_, Option<VirtualColumnMeta>) =
get_pb_value(self, &name_ident).await?;

if let Some(virtual_column) = virtual_column_opt {
return Ok(vec![virtual_column]);
} else {
return Ok(vec![]);
}
}

// Get virtual columns list by `prefix_list` "<prefix>/<tenant>"
let prefix_key = kvapi::KeyBuilder::new_prefixed(VirtualColumnNameIdent::PREFIX)
.push_str(&req.tenant)
.done();

let list = self.prefix_list_kv(&prefix_key).await?;
let mut virtual_column_list = Vec::with_capacity(list.len());
for (_, seq) in list.iter() {
let virtual_column_meta: VirtualColumnMeta = deserialize_struct(&seq.data)?;
virtual_column_list.push(virtual_column_meta);
}

Ok(virtual_column_list)
}

#[tracing::instrument(level = "debug", ret, err, skip_all)]
async fn create_table(&self, req: CreateTableReq) -> Result<CreateTableReply, KVAppError> {
debug!(req = debug(&req), "SchemaApi: {}", func_name!());
Expand Down
Loading