Skip to content

Commit

Permalink
Revert "*: make unified-pool use FuturePool (tikv#15925)" (tikv#16050)
Browse files Browse the repository at this point in the history
close tikv#16015

Revert "*: make unified-pool use FuturePool (tikv#15925)"
- revert due to performance regression

Signed-off-by: nolouch <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
nolouch and ti-chi-bot[bot] committed Dec 21, 2023
1 parent e1aa7a7 commit 6d2b495
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 54 deletions.
15 changes: 2 additions & 13 deletions components/tikv_util/src/yatp_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,26 +394,15 @@ impl<T: PoolTicker> YatpPoolBuilder<T> {
FuturePool::from_pool(pool, &name, size, task)
}

fn build_single_level_pool(self) -> ThreadPool<TaskCell> {
pub fn build_single_level_pool(self) -> ThreadPool<TaskCell> {
let (builder, runner) = self.create_builder();
builder.build_with_queue_and_runner(
yatp::queue::QueueType::SingleLevel,
yatp::pool::CloneRunnerBuilder(runner),
)
}

pub fn build_multi_level_future_pool(self) -> FuturePool {
let name = self
.name_prefix
.clone()
.unwrap_or_else(|| "yatp_pool".to_string());
let size = self.core_thread_count;
let task = self.max_tasks;
let pool = self.build_multi_level_pool();
FuturePool::from_pool(pool, &name, size, task)
}

fn build_multi_level_pool(self) -> ThreadPool<TaskCell> {
pub fn build_multi_level_pool(self) -> ThreadPool<TaskCell> {
let name = self
.name_prefix
.clone()
Expand Down
127 changes: 86 additions & 41 deletions src/read_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ use tikv_util::{
yatp_pool::{self, CleanupMethod, FuturePool, PoolTicker, YatpPoolBuilder},
};
use tracker::TrackedFuture;
use yatp::{metrics::MULTILEVEL_LEVEL_ELAPSED, queue::Extras};
use yatp::{
metrics::MULTILEVEL_LEVEL_ELAPSED, pool::Remote, queue::Extras, task::future::TaskCell,
};

use self::metrics::*;
use crate::{
Expand All @@ -53,9 +55,11 @@ pub enum ReadPool {
read_pool_low: FuturePool,
},
Yatp {
pool: FuturePool,
// deprecated. will remove in the v8.x.
pool: yatp::ThreadPool<TaskCell>,
running_tasks: IntGauge,
running_threads: IntGauge,
max_tasks: usize,
pool_size: usize,
resource_ctl: Option<Arc<ResourceController>>,
time_slice_inspector: Arc<TimeSliceInspector>,
},
Expand All @@ -76,11 +80,17 @@ impl ReadPool {
ReadPool::Yatp {
pool,
running_tasks,
running_threads,
max_tasks,
pool_size,
resource_ctl,
time_slice_inspector,
} => ReadPoolHandle::Yatp {
remote: pool.clone(),
remote: pool.remote().clone(),
running_tasks: running_tasks.clone(),
running_threads: running_threads.clone(),
max_tasks: *max_tasks,
pool_size: *pool_size,
resource_ctl: resource_ctl.clone(),
time_slice_inspector: time_slice_inspector.clone(),
},
Expand All @@ -96,8 +106,11 @@ pub enum ReadPoolHandle {
read_pool_low: FuturePool,
},
Yatp {
remote: FuturePool,
remote: Remote<TaskCell>,
running_tasks: IntGauge,
running_threads: IntGauge,
max_tasks: usize,
pool_size: usize,
resource_ctl: Option<Arc<ResourceController>>,
time_slice_inspector: Arc<TimeSliceInspector>,
},
Expand Down Expand Up @@ -132,10 +145,19 @@ impl ReadPoolHandle {
ReadPoolHandle::Yatp {
remote,
running_tasks,
max_tasks,
resource_ctl,
..
} => {
let running_tasks = running_tasks.clone();
// Note that the running task number limit is not strict.
// If several tasks are spawned at the same time while the running task number
// is close to the limit, they may all pass this check and the number of running
// tasks may exceed the limit.
if running_tasks.get() as usize >= *max_tasks {
return Err(ReadPoolError::UnifiedReadPoolFull);
}

running_tasks.inc();
let fixed_level = match priority {
CommandPri::High => Some(0),
Expand All @@ -145,26 +167,31 @@ impl ReadPoolHandle {
let group_name = metadata.group_name().to_owned();
let mut extras = Extras::new_multilevel(task_id, fixed_level);
extras.set_metadata(metadata.to_vec());
if let Some(resource_ctl) = resource_ctl {
let fut = TrackedFuture::new(with_resource_limiter(
ControlledFuture::new(
async move {
f.await;
running_tasks.dec();
},
resource_ctl.clone(),
group_name,
),
resource_limiter,
));
remote.spawn_with_extras(fut, extras)?;
let task_cell = if let Some(resource_ctl) = resource_ctl {
TaskCell::new(
TrackedFuture::new(with_resource_limiter(
ControlledFuture::new(
async move {
f.await;
running_tasks.dec();
},
resource_ctl.clone(),
group_name,
),
resource_limiter,
)),
extras,
)
} else {
let fut = async move {
f.await;
running_tasks.dec();
};
remote.spawn_with_extras(fut, extras)?;
}
TaskCell::new(
TrackedFuture::new(async move {
f.await;
running_tasks.dec();
}),
extras,
)
};
remote.spawn(task_cell);
}
}
Ok(())
Expand Down Expand Up @@ -204,7 +231,7 @@ impl ReadPoolHandle {
ReadPoolHandle::FuturePools {
read_pool_normal, ..
} => read_pool_normal.get_pool_size(),
ReadPoolHandle::Yatp { remote, .. } => remote.get_pool_size(),
ReadPoolHandle::Yatp { pool_size, .. } => *pool_size,
}
}

Expand All @@ -214,10 +241,10 @@ impl ReadPoolHandle {
read_pool_normal, ..
} => read_pool_normal.get_running_task_count() / read_pool_normal.get_pool_size(),
ReadPoolHandle::Yatp {
remote,
running_tasks,
pool_size,
..
} => running_tasks.get() as usize / remote.get_pool_size(),
} => running_tasks.get() as usize / *pool_size,
}
}

Expand All @@ -226,8 +253,19 @@ impl ReadPoolHandle {
ReadPoolHandle::FuturePools { .. } => {
unreachable!()
}
ReadPoolHandle::Yatp { remote, .. } => {
remote.scale_pool_size(max_thread_count);
ReadPoolHandle::Yatp {
remote,
running_threads,
max_tasks,
pool_size,
..
} => {
remote.scale_workers(max_thread_count);
*max_tasks = max_tasks
.saturating_div(*pool_size)
.saturating_mul(max_thread_count);
running_threads.set(max_thread_count as i64);
*pool_size = max_thread_count;
}
}
}
Expand Down Expand Up @@ -430,11 +468,6 @@ pub fn build_yatp_read_pool_with_name<E: Engine, R: FlowStatsReporter>(
config.max_thread_count,
),
)
.max_tasks(
config
.max_tasks_per_worker
.saturating_mul(config.max_thread_count),
)
.after_start(move || {
let engine = raftkv.lock().unwrap().clone();
set_tls_engine(engine);
Expand All @@ -450,15 +483,21 @@ pub fn build_yatp_read_pool_with_name<E: Engine, R: FlowStatsReporter>(
}

let pool = if let Some(ref r) = resource_ctl {
builder.build_priority_future_pool(r.clone())
builder.build_priority_pool(r.clone())
} else {
builder.build_multi_level_future_pool()
builder.build_multi_level_pool()
};
let time_slice_inspector = Arc::new(TimeSliceInspector::new(&unified_read_pool_name));
ReadPool::Yatp {
pool,
running_tasks: UNIFIED_READ_POOL_RUNNING_TASKS
.with_label_values(&[&unified_read_pool_name]),
running_threads: UNIFIED_READ_POOL_RUNNING_THREADS
.with_label_values(&[&unified_read_pool_name]),
max_tasks: config
.max_tasks_per_worker
.saturating_mul(config.max_thread_count),
pool_size: config.max_thread_count,
resource_ctl,
time_slice_inspector,
}
Expand Down Expand Up @@ -727,6 +766,12 @@ mod metrics {
&["name"]
)
.unwrap();
pub static ref UNIFIED_READ_POOL_RUNNING_THREADS: IntGaugeVec = register_int_gauge_vec!(
"tikv_unified_read_pool_thread_count",
"The number of running threads in the unified read pool",
&["name"]
)
.unwrap();
}
}

Expand Down Expand Up @@ -792,7 +837,7 @@ mod tests {

thread::sleep(Duration::from_millis(300));
match handle.spawn(task3, CommandPri::Normal, 3, TaskMetadata::default(), None) {
Err(ReadPoolError::FuturePoolFull(..)) => {}
Err(ReadPoolError::UnifiedReadPoolFull) => {}
_ => panic!("should return full error"),
}
tx1.send(()).unwrap();
Expand Down Expand Up @@ -847,7 +892,7 @@ mod tests {

thread::sleep(Duration::from_millis(300));
match handle.spawn(task3, CommandPri::Normal, 3, TaskMetadata::default(), None) {
Err(ReadPoolError::FuturePoolFull(..)) => {}
Err(ReadPoolError::UnifiedReadPoolFull) => {}
_ => panic!("should return full error"),
}

Expand All @@ -860,7 +905,7 @@ mod tests {

thread::sleep(Duration::from_millis(300));
match handle.spawn(task5, CommandPri::Normal, 5, TaskMetadata::default(), None) {
Err(ReadPoolError::FuturePoolFull(..)) => {}
Err(ReadPoolError::UnifiedReadPoolFull) => {}
_ => panic!("should return full error"),
}
}
Expand Down Expand Up @@ -909,7 +954,7 @@ mod tests {

thread::sleep(Duration::from_millis(300));
match handle.spawn(task3, CommandPri::Normal, 3, TaskMetadata::default(), None) {
Err(ReadPoolError::FuturePoolFull(..)) => {}
Err(ReadPoolError::UnifiedReadPoolFull) => {}
_ => panic!("should return full error"),
}

Expand All @@ -926,7 +971,7 @@ mod tests {

thread::sleep(Duration::from_millis(300));
match handle.spawn(task5, CommandPri::Normal, 5, TaskMetadata::default(), None) {
Err(ReadPoolError::FuturePoolFull(..)) => {}
Err(ReadPoolError::UnifiedReadPoolFull) => {}
_ => panic!("should return full error"),
}
}
Expand Down

0 comments on commit 6d2b495

Please sign in to comment.