Skip to content

Commit

Permalink
fix: dynamically get python udf from scripts table
Browse files Browse the repository at this point in the history
Signed-off-by: xxxuuu <[email protected]>
  • Loading branch information
xxxuuu committed Apr 22, 2024
1 parent 8f2ce4a commit 9a06553
Show file tree
Hide file tree
Showing 24 changed files with 610 additions and 324 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion src/cmd/src/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,11 +275,12 @@ impl StartCommand {
cached_meta_backend.clone(),
catalog_manager,
Arc::new(DatanodeClients::default()),
meta_client,
meta_client.clone(),
)
.with_plugin(plugins.clone())
.with_cache_invalidator(multi_cache_invalidator)
.with_heartbeat_task(heartbeat_task)
.with_meta_client(meta_client.clone())
.try_build()
.await
.context(StartFrontendSnafu)?;
Expand Down
26 changes: 26 additions & 0 deletions src/common/function/src/function_registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::collections::HashMap;
use std::sync::{Arc, RwLock};

use once_cell::sync::Lazy;
use session::context::QueryContextRef;

use crate::function::FunctionRef;
use crate::scalars::aggregate::{AggregateFunctionMetaRef, AggregateFunctions};
Expand All @@ -28,10 +29,17 @@ use crate::scalars::timestamp::TimestampFunction;
use crate::system::SystemFunction;
use crate::table::TableFunction;

pub trait FunctionProvider {
fn get_function(&self, name: &str, query_ctx: QueryContextRef) -> Option<FunctionRef>;
}

pub type FunctionProviderRef = Arc<dyn FunctionProvider + Sync + Send>;

#[derive(Default)]
pub struct FunctionRegistry {
functions: RwLock<HashMap<String, FunctionRef>>,
aggregate_functions: RwLock<HashMap<String, AggregateFunctionMetaRef>>,
dynamic_providers: RwLock<Vec<FunctionProviderRef>>,
}

impl FunctionRegistry {
Expand All @@ -51,6 +59,10 @@ impl FunctionRegistry {
.insert(func.name(), func);
}

pub fn register_provider(&self, provider: FunctionProviderRef) {
self.dynamic_providers.write().unwrap().push(provider);
}

pub fn get_aggr_function(&self, name: &str) -> Option<AggregateFunctionMetaRef> {
self.aggregate_functions.read().unwrap().get(name).cloned()
}
Expand All @@ -59,6 +71,20 @@ impl FunctionRegistry {
self.functions.read().unwrap().get(name).cloned()
}

pub fn get_function_fallback(
&self,
name: &str,
query_ctx: QueryContextRef,
) -> Option<FunctionRef> {
for provider in self.dynamic_providers.read().unwrap().iter() {
let fun = provider.get_function(name, query_ctx.clone());
if fun.is_some() {
return fun;
}
}
None
}

pub fn functions(&self) -> Vec<FunctionRef> {
self.functions.read().unwrap().values().cloned().collect()
}
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ impl Instance {
.enable_heartbeat()
.enable_procedure()
.enable_access_cluster_info()
.enable_lock()
.channel_manager(channel_manager)
.ddl_channel_manager(ddl_channel_manager)
.build();
Expand Down
17 changes: 16 additions & 1 deletion src/frontend/src/instance/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use common_meta::datanode_manager::DatanodeManagerRef;
use common_meta::ddl::ProcedureExecutorRef;
use common_meta::key::TableMetadataManager;
use common_meta::kv_backend::KvBackendRef;
use meta_client::client::MetaClientRef;
use operator::delete::Deleter;
use operator::insert::Inserter;
use operator::procedure::ProcedureServiceOperator;
Expand All @@ -46,6 +47,7 @@ pub struct FrontendBuilder {
plugins: Option<Plugins>,
procedure_executor: ProcedureExecutorRef,
heartbeat_task: Option<HeartbeatTask>,
meta_client: Option<MetaClientRef>,
}

impl FrontendBuilder {
Expand All @@ -63,6 +65,7 @@ impl FrontendBuilder {
plugins: None,
procedure_executor,
heartbeat_task: None,
meta_client: None,
}
}

Expand All @@ -87,6 +90,13 @@ impl FrontendBuilder {
}
}

pub fn with_meta_client(self, meta_client: MetaClientRef) -> Self {
Self {
meta_client: Some(meta_client),
..self
}
}

pub async fn try_build(self) -> Result<Instance> {
let kv_backend = self.kv_backend;
let datanode_manager = self.datanode_manager;
Expand Down Expand Up @@ -137,7 +147,12 @@ impl FrontendBuilder {
.query_engine();

let script_executor = Arc::new(
ScriptExecutor::new(self.catalog_manager.clone(), query_engine.clone()).await?,
ScriptExecutor::new(
self.catalog_manager.clone(),
query_engine.clone(),
self.meta_client,
)
.await?,
);

let statement_executor = Arc::new(StatementExecutor::new(
Expand Down
Loading

0 comments on commit 9a06553

Please sign in to comment.