Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Limit CPU in runtime (#3685) #4782

Merged
merged 1 commit into from
Oct 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ opentelemetry-proto = { version = "0.5", features = [
"with-serde",
"logs",
] }
parking_lot = "0.12"
parquet = { version = "51.0.0", default-features = false, features = ["arrow", "async", "object_store"] }
paste = "1.0"
pin-project = "1.0"
Expand All @@ -148,6 +149,7 @@ promql-parser = { version = "0.4.1" }
prost = "0.12"
raft-engine = { version = "0.4.1", default-features = false }
rand = "0.8"
ratelimit = "0.9"
regex = "1.8"
regex-automata = { version = "0.4" }
reqwest = { version = "0.12", default-features = false, features = [
Expand Down
2 changes: 1 addition & 1 deletion src/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ enum_dispatch = "0.3"
futures-util.workspace = true
lazy_static.workspace = true
moka = { workspace = true, features = ["future"] }
parking_lot = "0.12"
parking_lot.workspace = true
prometheus.workspace = true
prost.workspace = true
query.workspace = true
Expand Down
15 changes: 15 additions & 0 deletions src/common/runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,36 @@ version.workspace = true
edition.workspace = true
license.workspace = true

[lib]
path = "src/lib.rs"

[[bin]]
name = "common-runtime-bin"
path = "src/bin.rs"

[lints]
workspace = true

[dependencies]
async-trait.workspace = true
clap.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-telemetry.workspace = true
futures.workspace = true
lazy_static.workspace = true
num_cpus.workspace = true
once_cell.workspace = true
parking_lot.workspace = true
paste.workspace = true
pin-project.workspace = true
prometheus.workspace = true
rand.workspace = true
ratelimit.workspace = true
serde.workspace = true
serde_json.workspace = true
snafu.workspace = true
tempfile.workspace = true
tokio.workspace = true
tokio-metrics = "0.3"
tokio-metrics-collector = { git = "https://github.com/MichaelScofield/tokio-metrics-collector.git", rev = "89d692d5753d28564a7aac73c6ac5aba22243ba0" }
Expand Down
60 changes: 60 additions & 0 deletions src/common/runtime/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Greptime Runtime

## Run performance test for different priority & workload type

```
# workspace is at this subcrate
cargo run --release -- --loop-cnt 500
```

## Related PRs & issues

- Preliminary support cpu limitation

ISSUE: https://github.com/GreptimeTeam/greptimedb/issues/3685

PR: https://github.com/GreptimeTeam/greptimedb/pull/4782

## CPU resource constraints (ThrottleableRuntime)


To achieve CPU resource constraints, we adopt the concept of rate limiting. When creating a future, we first wrap it with another layer of future to intercept the poll operation during runtime. By using the ratelimit library, we can simply implement a mechanism that allows only a limited number of polls for a batch of tasks under a certain priority within a specific time frame (the current token generation interval is set to 10ms).

The default used runtime can be switched by
``` rust
pub type Runtime = DefaultRuntime;
```
in `runtime.rs`.

We tested four type of workload with 5 priorities, whose setup are as follows:

``` rust
impl Priority {
fn ratelimiter_count(&self) -> Result<Option<Ratelimiter>> {
let max = 8000;
let gen_per_10ms = match self {
Priority::VeryLow => Some(2000),
Priority::Low => Some(4000),
Priority::Middle => Some(6000),
Priority::High => Some(8000),
Priority::VeryHigh => None,
};
if let Some(gen_per_10ms) = gen_per_10ms {
Ratelimiter::builder(gen_per_10ms, Duration::from_millis(10)) // generate poll count per 10ms
.max_tokens(max) // reserved token for batch request
.build()
.context(BuildRuntimeRateLimiterSnafu)
.map(Some)
} else {
Ok(None)
}
}
}
```

This is the preliminary experimental effect so far:

![](resources/rdme-exp.png)

## TODO
- Introduce PID to achieve more accurate limitation.
Binary file added src/common/runtime/resources/rdme-exp.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
205 changes: 205 additions & 0 deletions src/common/runtime/src/bin.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use clap::Parser;

#[derive(Debug, Default, Parser)]
pub struct Command {
#[clap(long)]
loop_cnt: usize,
}

fn main() {
common_telemetry::init_default_ut_logging();
let cmd = Command::parse();

test_diff_priority_cpu::test_diff_workload_priority(cmd.loop_cnt);
}

mod test_diff_priority_cpu {
use std::path::PathBuf;

use common_runtime::runtime::{BuilderBuild, Priority, RuntimeTrait};
use common_runtime::{Builder, Runtime};
use common_telemetry::debug;
use tempfile::TempDir;

fn compute_pi_str(precision: usize) -> String {
let mut pi = 0.0;
let mut sign = 1.0;

for i in 0..precision {
pi += sign / (2 * i + 1) as f64;
sign *= -1.0;
}

pi *= 4.0;
format!("{:.prec$}", pi, prec = precision)
}

macro_rules! def_workload_enum {
($($variant:ident),+) => {
#[derive(Debug)]
enum WorkloadType {
$($variant),+
}

/// array of workloads for iteration
const WORKLOADS: &'static [WorkloadType] = &[
$( WorkloadType::$variant ),+
];
};
}

def_workload_enum!(
ComputeHeavily,
ComputeHeavily2,
WriteFile,
SpawnBlockingWriteFile
);

async fn workload_compute_heavily() {
let prefix = 10;

for _ in 0..3000 {
let _ = compute_pi_str(prefix);
tokio::task::yield_now().await;
}
}
async fn workload_compute_heavily2() {
let prefix = 30;
for _ in 0..2000 {
let _ = compute_pi_str(prefix);
tokio::task::yield_now().await;
}
}
async fn workload_write_file(_idx: u64, tempdir: PathBuf) {
use tokio::io::AsyncWriteExt;
let prefix = 50;

let mut file = tokio::fs::OpenOptions::new()
.write(true)
.append(true)
.create(true)
.open(tempdir.join(format!("pi_{}", prefix)))
.await
.unwrap();
for i in 0..200 {
let pi = compute_pi_str(prefix);

if i % 2 == 0 {
file.write_all(pi.as_bytes()).await.unwrap();
}
}
}
async fn workload_spawn_blocking_write_file(tempdir: PathBuf) {
use std::io::Write;
let prefix = 100;
let mut file = Some(
std::fs::OpenOptions::new()
.append(true)
.create(true)
.open(tempdir.join(format!("pi_{}", prefix)))
.unwrap(),
);
for i in 0..100 {
let pi = compute_pi_str(prefix);
if i % 2 == 0 {
let mut file1 = file.take().unwrap();
file = Some(
tokio::task::spawn_blocking(move || {
file1.write_all(pi.as_bytes()).unwrap();
file1
})
.await
.unwrap(),
);
}
}
}

pub fn test_diff_workload_priority(loop_cnt: usize) {
let tempdir = tempfile::tempdir().unwrap();
let priorities = [
Priority::VeryLow,
Priority::Low,
Priority::Middle,
Priority::High,
Priority::VeryHigh,
];
for wl in WORKLOADS {
for p in priorities.iter() {
let runtime: Runtime = Builder::default()
.runtime_name("test")
.thread_name("test")
.worker_threads(8)
.priority(*p)
.build()
.expect("Fail to create runtime");
let runtime2 = runtime.clone();
runtime.block_on(test_spec_priority_and_workload(
*p, runtime2, wl, &tempdir, loop_cnt,
));
}
}
}

async fn test_spec_priority_and_workload(
priority: Priority,
runtime: Runtime,
workload_id: &WorkloadType,
tempdir: &TempDir,
loop_cnt: usize,
) {
tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
debug!(
"testing cpu usage for priority {:?} workload_id {:?}",
priority, workload_id,
);
// start monitor thread
let mut tasks = vec![];
let start = std::time::Instant::now();
for i in 0..loop_cnt {
// persist cpu usage in json: {priority}.{workload_id}
match *workload_id {
WorkloadType::ComputeHeavily => {
tasks.push(runtime.spawn(workload_compute_heavily()));
}
WorkloadType::ComputeHeavily2 => {
tasks.push(runtime.spawn(workload_compute_heavily2()));
}
WorkloadType::SpawnBlockingWriteFile => {
tasks.push(runtime.spawn(workload_spawn_blocking_write_file(
tempdir.path().to_path_buf(),
)));
}
WorkloadType::WriteFile => {
tasks.push(
runtime.spawn(workload_write_file(i as u64, tempdir.path().to_path_buf())),
);
}
}
}
for task in tasks {
task.await.unwrap();
}
let elapsed = start.elapsed();
debug!(
"test cpu usage for priority {:?} workload_id {:?} elapsed {}ms",
priority,
workload_id,
elapsed.as_millis()
);
}
}
Loading