Skip to content

Commit

Permalink
feat: add throttle runtime (#3685)
Browse files Browse the repository at this point in the history
  • Loading branch information
ActivePeter committed Oct 23, 2024
1 parent 2ee1ce2 commit 8ecf99e
Show file tree
Hide file tree
Showing 31 changed files with 787 additions and 41 deletions.
30 changes: 30 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ opentelemetry-proto = { version = "0.5", features = [
"with-serde",
"logs",
] }
parking_lot = "0.12"
parquet = { version = "51.0.0", default-features = false, features = ["arrow", "async", "object_store"] }
paste = "1.0"
pin-project = "1.0"
Expand All @@ -148,6 +149,7 @@ promql-parser = { version = "0.4.1" }
prost = "0.12"
raft-engine = { version = "0.4.1", default-features = false }
rand = "0.8"
ratelimit = "0.9"
regex = "1.8"
regex-automata = { version = "0.4" }
reqwest = { version = "0.12", default-features = false, features = [
Expand Down
2 changes: 1 addition & 1 deletion src/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ enum_dispatch = "0.3"
futures-util.workspace = true
lazy_static.workspace = true
moka = { workspace = true, features = ["future"] }
parking_lot = "0.12"
parking_lot.workspace = true
prometheus.workspace = true
prost.workspace = true
query.workspace = true
Expand Down
15 changes: 15 additions & 0 deletions src/common/runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,36 @@ version.workspace = true
edition.workspace = true
license.workspace = true

[lib]
path = "src/lib.rs"

[[bin]]
name = "common-runtime-bin"
path = "src/bin.rs"

[lints]
workspace = true

[dependencies]
async-trait.workspace = true
clap.workspace = true
common-error.workspace = true
common-macro.workspace = true
common-telemetry.workspace = true
futures.workspace = true
lazy_static.workspace = true
num_cpus.workspace = true
once_cell.workspace = true
parking_lot.workspace = true
paste.workspace = true
pin-project.workspace = true
prometheus.workspace = true
rand.workspace = true
ratelimit.workspace = true
serde.workspace = true
serde_json.workspace = true
snafu.workspace = true
tempfile.workspace = true
tokio.workspace = true
tokio-metrics = "0.3"
tokio-metrics-collector = { git = "https://github.com/MichaelScofield/tokio-metrics-collector.git", rev = "89d692d5753d28564a7aac73c6ac5aba22243ba0" }
Expand Down
60 changes: 60 additions & 0 deletions src/common/runtime/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Greptime Runtime

## Run performance test for different priority & workload type

```
# workspace is at this subcrate
cargo run --release -- --loop-cnt 500
```

## Related PRs & ISSUEs

Check warning on line 10 in src/common/runtime/README.md

View workflow job for this annotation

GitHub Actions / Check typos and docs

"ISSU" should be "ISSUE".

Check warning on line 10 in src/common/runtime/README.md

View workflow job for this annotation

GitHub Actions / Spell Check with Typos

"ISSU" should be "ISSUE".

- Preliminary support cpu limitation

ISSUE: https://github.com/GreptimeTeam/greptimedb/issues/3685

PR: https://github.com/GreptimeTeam/greptimedb/pull/4782

## CPU resource constraints (ThrottleableRuntime)


To achieve CPU resource constraints, we adopt the concept of rate limiting. When creating a future, we first wrap it with another layer of future to intercept the poll operation during runtime. By using the ratelimit library, we can simply implement a mechanism that allows only a limited number of polls for a batch of tasks under a certain priority within a specific time frame (the current token generation interval is set to 10ms).

The default used runtime can be switched by
``` rust
pub type Runtime = DefaultRuntime;
```
in `runtime.rs`.

We tested four type of workload with 5 priorities, whose setup are as follows:

``` rust
impl Priority {
fn ratelimiter_count(&self) -> Result<Option<Ratelimiter>> {
let max = 8000;
let gen_per_10ms = match self {
Priority::VeryLow => Some(2000),
Priority::Low => Some(4000),
Priority::Middle => Some(6000),
Priority::High => Some(8000),
Priority::VeryHigh => None,
};
if let Some(gen_per_10ms) = gen_per_10ms {
Ratelimiter::builder(gen_per_10ms, Duration::from_millis(10)) // generate poll count per 10ms
.max_tokens(max) // reserved token for batch request
.build()
.context(BuildRuntimeRateLimiterSnafu)
.map(Some)
} else {
Ok(None)
}
}
}
```

This is the preliminary experimental effect so far:

![](resources/rdme-exp.png)

## TODO
- Introduce PID to achieve more accurate limitation.
Binary file added src/common/runtime/resources/rdme-exp.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
205 changes: 205 additions & 0 deletions src/common/runtime/src/bin.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use clap::Parser;

#[derive(Debug, Default, Parser)]
pub struct Command {
#[clap(long)]
loop_cnt: usize,
}

fn main() {
common_telemetry::init_default_ut_logging();
let cmd = Command::parse();

test_diff_priority_cpu::test_diff_workload_priority(cmd.loop_cnt);
}

mod test_diff_priority_cpu {
use std::path::PathBuf;

use common_runtime::runtime::{BuilderBuild, Priority, RuntimeTrait};
use common_runtime::{Builder, Runtime};
use common_telemetry::debug;
use tempfile::TempDir;

fn compute_pi_str(precision: usize) -> String {
let mut pi = 0.0;
let mut sign = 1.0;

for i in 0..precision {
pi += sign / (2 * i + 1) as f64;
sign *= -1.0;
}

pi *= 4.0;
format!("{:.prec$}", pi, prec = precision)
}

macro_rules! def_workload_enum {
($($variant:ident),+) => {
#[derive(Debug)]
enum WorkloadType {
$($variant),+
}

/// array of workloads for iteration
const WORKLOADS: &'static [WorkloadType] = &[
$( WorkloadType::$variant ),+
];
};
}

def_workload_enum!(
ComputeHeavily,
ComputeHeavily2,
WriteFile,
SpawnBlockingWriteFile
);

async fn workload_compute_heavily() {
let prefix = 10;

for _ in 0..3000 {
let _ = compute_pi_str(prefix);
tokio::task::yield_now().await;
}
}
async fn workload_compute_heavily2() {
let prefix = 30;
for _ in 0..2000 {
let _ = compute_pi_str(prefix);
tokio::task::yield_now().await;
}
}
async fn workload_write_file(_idx: u64, tempdir: PathBuf) {
use tokio::io::AsyncWriteExt;
let prefix = 50;

let mut file = tokio::fs::OpenOptions::new()
.write(true)
.append(true)
.create(true)
.open(tempdir.join(format!("pi_{}", prefix)))
.await
.unwrap();
for i in 0..200 {
let pi = compute_pi_str(prefix);

if i % 2 == 0 {
file.write_all(pi.as_bytes()).await.unwrap();
}
}
}
async fn workload_spawn_blocking_write_file(tempdir: PathBuf) {
use std::io::Write;
let prefix = 100;
let mut file = Some(
std::fs::OpenOptions::new()
.append(true)
.create(true)
.open(tempdir.join(format!("pi_{}", prefix)))
.unwrap(),
);
for i in 0..100 {
let pi = compute_pi_str(prefix);
if i % 2 == 0 {
let mut file1 = file.take().unwrap();
file = Some(
tokio::task::spawn_blocking(move || {
file1.write_all(pi.as_bytes()).unwrap();
file1
})
.await
.unwrap(),
);
}
}
}

pub fn test_diff_workload_priority(loop_cnt: usize) {
let tempdir = tempfile::tempdir().unwrap();
let priorities = [
Priority::VeryLow,
Priority::Low,
Priority::Middle,
Priority::High,
Priority::VeryHigh,
];
for wl in WORKLOADS {
for p in priorities.iter() {
let runtime: Runtime = Builder::default()
.runtime_name("test")
.thread_name("test")
.worker_threads(8)
.priority(*p)
.build()
.expect("Fail to create runtime");
let runtime2 = runtime.clone();
runtime.block_on(test_spec_priority_and_workload(
*p, runtime2, wl, &tempdir, loop_cnt,
));
}
}
}

async fn test_spec_priority_and_workload(
priority: Priority,
runtime: Runtime,
workload_id: &WorkloadType,
tempdir: &TempDir,
loop_cnt: usize,
) {
tokio::time::sleep(tokio::time::Duration::from_millis(1000)).await;
debug!(
"testing cpu usage for priority {:?} workload_id {:?}",
priority, workload_id,
);
// start monitor thread
let mut tasks = vec![];
let start = std::time::Instant::now();
for i in 0..loop_cnt {
// persist cpu usage in json: {priority}.{workload_id}
match *workload_id {
WorkloadType::ComputeHeavily => {
tasks.push(runtime.spawn(workload_compute_heavily()));
}
WorkloadType::ComputeHeavily2 => {
tasks.push(runtime.spawn(workload_compute_heavily2()));
}
WorkloadType::SpawnBlockingWriteFile => {
tasks.push(runtime.spawn(workload_spawn_blocking_write_file(
tempdir.path().to_path_buf(),
)));
}
WorkloadType::WriteFile => {
tasks.push(
runtime.spawn(workload_write_file(i as u64, tempdir.path().to_path_buf())),
);
}
}
}
for task in tasks {
task.await.unwrap();
}
let elapsed = start.elapsed();
debug!(
"test cpu usage for priority {:?} workload_id {:?} elapsed {}ms",
priority,
workload_id,
elapsed.as_millis()
);
}
}
Loading

0 comments on commit 8ecf99e

Please sign in to comment.