Skip to content

Commit

Permalink
fix(executor): queries pipeline executor schedule incorrectly (#14787)
Browse files Browse the repository at this point in the history
* fix(executor): queries pipeline executor schedule incorrectly

* should not affect current executor
  • Loading branch information
dqhl76 authored Feb 29, 2024
1 parent 640a044 commit dfcc805
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ impl ExecutorSettings {
pub fn try_create(settings: &Settings, query_id: String) -> Result<ExecutorSettings> {
let max_execute_time_in_seconds = settings.get_max_execute_time_in_seconds()?;
Ok(ExecutorSettings {
enable_new_executor: settings.get_enable_experimental_new_executor()?,
enable_new_executor: settings.get_enable_experimental_queries_executor()?,
query_id: Arc::new(query_id),
max_execute_time_in_seconds: Duration::from_secs(max_execute_time_in_seconds),
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,18 @@ use std::time::Instant;

use databend_common_base::runtime::profile::Profile;
use databend_common_base::runtime::profile::ProfileStatisticsName;
use databend_common_base::runtime::TrySpawn;
use databend_common_exception::ErrorCode;
use databend_common_exception::Result;
use minitrace::future::FutureExt;
use minitrace::Span;
use petgraph::prelude::NodeIndex;

use crate::pipelines::executor::executor_graph::ProcessorWrapper;
use crate::pipelines::executor::processor_async_task::ExecutorTasksQueue;
use crate::pipelines::executor::ProcessorAsyncTask;
use crate::pipelines::executor::QueriesExecutorTasksQueue;
use crate::pipelines::executor::QueriesPipelineExecutor;
use crate::pipelines::executor::RunningGraph;
use crate::pipelines::executor::WorkersCondvar;

Expand Down Expand Up @@ -96,15 +103,30 @@ impl ExecutorWorkerContext {
}

/// # Safety
pub unsafe fn execute_task(&mut self) -> Result<Option<(NodeIndex, Arc<RunningGraph>)>> {
pub unsafe fn execute_task(
&mut self,
executor: Option<&Arc<QueriesPipelineExecutor>>,
) -> Result<Option<(NodeIndex, Arc<RunningGraph>)>> {
match std::mem::replace(&mut self.task, ExecutorTask::None) {
ExecutorTask::None => Err(ErrorCode::Internal("Execute none task.")),
ExecutorTask::Sync(processor) => self.execute_sync_task(processor),
ExecutorTask::Async(processor) => {
if let Some(executor) = executor {
self.execute_async_task(
processor,
executor,
executor.global_tasks_queue.clone(),
)
} else {
Err(ErrorCode::Internal(
"Async task should only be executed on queries executor",
))
}
}
ExecutorTask::AsyncCompleted(task) => match task.res {
Ok(_) => Ok(Some((task.id, task.graph))),
Err(cause) => Err(cause),
},
ExecutorTask::Async(_) => unreachable!("used for new executor"),
}
}

Expand All @@ -125,6 +147,40 @@ impl ExecutorWorkerContext {
Ok(Some((proc.processor.id(), proc.graph)))
}

pub fn execute_async_task(
&mut self,
proc: ProcessorWrapper,
executor: &Arc<QueriesPipelineExecutor>,
global_queue: Arc<QueriesExecutorTasksQueue>,
) -> Result<Option<(NodeIndex, Arc<RunningGraph>)>> {
unsafe {
let workers_condvar = self.workers_condvar.clone();
workers_condvar.inc_active_async_worker();
let query_id = self.query_id.clone();
let wakeup_worker_id = self.worker_id;
let process_future = proc.processor.async_process();
let node_profile = executor.graph.get_node_profile(proc.processor.id()).clone();
let graph = proc.graph;
executor.async_runtime.spawn(
query_id.as_ref().clone(),
ProcessorAsyncTask::create(
query_id,
wakeup_worker_id,
proc.processor.clone(),
Arc::new(ExecutorTasksQueue::QueriesExecutorTasksQueue(global_queue)),
workers_condvar,
node_profile,
graph,
process_future,
)
.in_span(Span::enter_with_local_parent(std::any::type_name::<
ProcessorAsyncTask,
>())),
);
}
Ok(None)
}

pub fn get_workers_condvar(&self) -> &Arc<WorkersCondvar> {
&self.workers_condvar
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ impl QueriesPipelineExecutor {
}

while !self.global_tasks_queue.is_finished() && context.has_task() {
if let Some((executed_pid, graph)) = context.execute_task()? {
if let Some((executed_pid, graph)) = context.execute_task(Some(self))? {
// Not scheduled graph if pipeline is finished.
if !self.global_tasks_queue.is_finished() {
// We immediately schedule the processor again.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ impl QueryPipelineExecutor {
}

while !self.global_tasks_queue.is_finished() && context.has_task() {
if let Some((executed_pid, graph)) = context.execute_task()? {
if let Some((executed_pid, graph)) = context.execute_task(None)? {
// Not scheduled graph if pipeline is finished.
if !self.global_tasks_queue.is_finished() {
// We immediately schedule the processor again.
Expand Down
2 changes: 1 addition & 1 deletion src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,7 @@ impl DefaultSettings {
mode:SettingMode::Both,
range: Some(SettingRange::Numeric(0..=1))
}),
("enable_experimental_new_executor", DefaultSettingValue {
("enable_experimental_queries_executor", DefaultSettingValue {
value: UserSettingValue::UInt64(0),
desc: "Enables experimental new executor",
mode: SettingMode::Both,
Expand Down
4 changes: 2 additions & 2 deletions src/query/settings/src/settings_getter_setter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -615,7 +615,7 @@ impl Settings {
self.try_set_u64("enable_geo_create_table", u64::from(val))
}

pub fn get_enable_experimental_new_executor(&self) -> Result<bool> {
Ok(self.try_get_u64("enable_experimental_new_executor")? == 1)
pub fn get_enable_experimental_queries_executor(&self) -> Result<bool> {
Ok(self.try_get_u64("enable_experimental_queries_executor")? == 1)
}
}

0 comments on commit dfcc805

Please sign in to comment.