Skip to content

Commit

Permalink
feat(prover): Add sending scale requests for Scaler targets (#3194)
Browse files Browse the repository at this point in the history
## What ❔

Add sending scale requests for Scaler targets.
Add dry-run config option for Scaler.

<!-- What are the changes this PR brings about? -->
<!-- Example: This PR adds a PR template to the repo. -->
<!-- (For bigger PRs adding more context is appreciated) -->

## Why ❔

<!-- Why are these changes done? What goal do they contribute to? What
are the principles behind them? -->
<!-- Example: PR templates ensure PR reviewers, observers, and future
iterators are in context about the evolution of repos. -->

## Checklist

<!-- Check your PR fulfills the following items. -->
<!-- For draft PRs check the boxes as you complete them. -->

- [x] PR title corresponds to the body of PR (we generate changelog
entries from PRs).
- [ ] Tests for the changes have been added / updated.
- [ ] Documentation comments have been added / updated.
- [x] Code has been formatted via `zkstack dev fmt` and `zkstack dev
lint`.


ref ZKD-1855

---------

Co-authored-by: Manuel Mauro <[email protected]>
  • Loading branch information
yorik and manuelmauro authored Oct 30, 2024
1 parent 1f8ad26 commit 767c5bc
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 35 deletions.
5 changes: 4 additions & 1 deletion core/lib/config/src/configs/prover_autoscaler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ pub struct ProverAutoscalerScalerConfig {
pub long_pending_duration: Duration,
/// List of simple autoscaler targets.
pub scaler_targets: Vec<ScalerTarget>,
/// If dry-run enabled don't send any scale requests.
#[serde(default)]
pub dry_run: bool,
}

#[derive(
Expand Down Expand Up @@ -122,7 +125,7 @@ pub enum QueueReportFields {
#[derive(Debug, Clone, PartialEq, Deserialize, Default)]
pub struct ScalerTarget {
pub queue_report_field: QueueReportFields,
pub pod_name_prefix: String,
pub deployment: String,
/// Max replicas per cluster.
pub max_replicas: HashMap<String, usize>,
/// The queue will be divided by the speed and rounded up to get number of replicas.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ message MaxReplica {

message ScalerTarget {
optional string queue_report_field = 1; // required
optional string pod_name_prefix = 2; // required
optional string deployment = 5; // required
repeated MaxReplica max_replicas = 3; // required at least one
optional uint64 speed = 4; // optional
reserved 2; reserved "pod_name_prefix";
}

message ProverAutoscalerScalerConfig {
Expand All @@ -69,4 +70,5 @@ message ProverAutoscalerScalerConfig {
repeated MaxProver max_provers = 9; // optional
repeated MinProver min_provers = 10; // optional
repeated ScalerTarget scaler_targets = 11; // optional
optional bool dry_run = 12; // optional
}
8 changes: 4 additions & 4 deletions core/lib/protobuf_config/src/prover_autoscaler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ impl ProtoRepr for proto::ProverAutoscalerScalerConfig {
.enumerate()
.map(|(i, x)| x.read().context(i).unwrap())
.collect::<Vec<_>>(),
dry_run: self.dry_run.unwrap_or_default(),
})
}

Expand Down Expand Up @@ -158,6 +159,7 @@ impl ProtoRepr for proto::ProverAutoscalerScalerConfig {
.map(|(k, v)| proto::MinProver::build(&(k.clone(), *v)))
.collect(),
scaler_targets: this.scaler_targets.iter().map(ProtoRepr::build).collect(),
dry_run: Some(this.dry_run),
}
}
}
Expand Down Expand Up @@ -269,9 +271,7 @@ impl ProtoRepr for proto::ScalerTarget {
queue_report_field: required(&self.queue_report_field)
.and_then(|x| Ok((*x).parse()?))
.context("queue_report_field")?,
pod_name_prefix: required(&self.pod_name_prefix)
.context("pod_name_prefix")?
.clone(),
deployment: required(&self.deployment).context("deployment")?.clone(),
max_replicas: self
.max_replicas
.iter()
Expand All @@ -289,7 +289,7 @@ impl ProtoRepr for proto::ScalerTarget {
fn build(this: &Self::Type) -> Self {
Self {
queue_report_field: Some(this.queue_report_field.to_string()),
pod_name_prefix: Some(this.pod_name_prefix.clone()),
deployment: Some(this.deployment.clone()),
max_replicas: this
.max_replicas
.iter()
Expand Down
2 changes: 1 addition & 1 deletion prover/crates/bin/prover_autoscaler/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ pub struct ScaleRequest {
pub deployments: Vec<ScaleDeploymentRequest>,
}

#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Default, Serialize, Deserialize)]
pub struct ScaleResponse {
pub scale_result: Vec<String>,
}
Expand Down
92 changes: 66 additions & 26 deletions prover/crates/bin/prover_autoscaler/src/global/scaler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ pub struct GpuScaler {

pub struct SimpleScaler {
queue_report_field: QueueReportFields,
pod_name_prefix: String,
deployment: String,
/// Which cluster to use first.
cluster_priorities: HashMap<String, u32>,
max_replicas: HashMap<String, usize>,
Expand Down Expand Up @@ -365,6 +365,47 @@ impl GpuScaler {

provers
}

fn diff(
namespace: &str,
provers: HashMap<GPUPoolKey, u32>,
clusters: &Clusters,
requests: &mut HashMap<String, ScaleRequest>,
) {
provers
.into_iter()
.for_each(|(GPUPoolKey { cluster, gpu }, replicas)| {
let prover = gpu_to_prover(gpu);
clusters
.clusters
.get(&cluster)
.and_then(|c| c.namespaces.get(namespace))
.and_then(|ns| ns.deployments.get(&prover))
.map_or_else(
|| {
tracing::error!(
"Wasn't able to find deployment {} in cluster {}, namespace {}",
prover,
cluster,
namespace
)
},
|deployment| {
if deployment.desired != replicas as i32 {
requests
.entry(cluster.clone())
.or_default()
.deployments
.push(ScaleDeploymentRequest {
namespace: namespace.into(),
name: prover.clone(),
size: replicas as i32,
});
}
},
);
})
}
}

#[derive(Default, Debug, PartialEq, Eq)]
Expand All @@ -389,7 +430,7 @@ impl SimpleScaler {
) -> Self {
Self {
queue_report_field: config.queue_report_field.clone(),
pod_name_prefix: config.pod_name_prefix.clone(),
deployment: config.deployment.clone(),
cluster_priorities,
max_replicas: config.max_replicas.clone(),
speed: config.speed,
Expand Down Expand Up @@ -418,7 +459,7 @@ impl SimpleScaler {
// Initialize pool only if we have ready deployments.
pool.pods.insert(PodStatus::Running, 0);

let pod_re = Regex::new(&format!("^{}-", self.pod_name_prefix)).unwrap();
let pod_re = Regex::new(&format!("^{}-", self.deployment)).unwrap();
for (_, pod) in namespace_value
.pods
.iter()
Expand Down Expand Up @@ -551,47 +592,46 @@ impl SimpleScaler {

pods
}
}

fn diff(
namespace: &str,
provers: HashMap<GPUPoolKey, u32>,
clusters: &Clusters,
requests: &mut HashMap<String, ScaleRequest>,
) {
provers
.into_iter()
.for_each(|(GPUPoolKey { cluster, gpu }, n)| {
let prover = gpu_to_prover(gpu);
fn diff(
&self,
namespace: &str,
replicas: HashMap<String, usize>,
clusters: &Clusters,
requests: &mut HashMap<String, ScaleRequest>,
) {
let deployment_name = self.deployment.clone();
replicas.into_iter().for_each(|(cluster, replicas)| {
clusters
.clusters
.get(&cluster)
.and_then(|c| c.namespaces.get(namespace))
.and_then(|ns| ns.deployments.get(&prover))
.and_then(|ns| ns.deployments.get(&deployment_name))
.map_or_else(
|| {
tracing::error!(
"Wasn't able to find deployment {} in cluster {}, namespace {}",
prover,
deployment_name,
cluster,
namespace
)
},
|d| {
if d.desired != n as i32 {
|deployment| {
if deployment.desired != replicas as i32 {
requests
.entry(cluster.clone())
.or_default()
.deployments
.push(ScaleDeploymentRequest {
namespace: namespace.into(),
name: prover.clone(),
size: n as i32,
name: deployment_name.clone(),
size: replicas as i32,
});
}
},
);
})
}
}

/// is_namespace_running returns true if there are some pods running in it.
Expand Down Expand Up @@ -638,7 +678,7 @@ impl Task for Scaler {
AUTOSCALER_METRICS.provers[&(k.cluster.clone(), ns.clone(), k.gpu)]
.set(*num as u64);
}
diff(ns, provers, &guard.clusters, &mut scale_requests);
GpuScaler::diff(ns, provers, &guard.clusters, &mut scale_requests);
}

// Simple Scalers.
Expand All @@ -647,15 +687,15 @@ impl Task for Scaler {
.get(&(ppv.to_string(), scaler.queue_report_field.clone()))
.cloned()
.unwrap_or(0);
tracing::debug!("Running eval for namespace {ns}, PPV {ppv}, simple scaler {} found queue {q}", scaler.pod_name_prefix);
tracing::debug!("Running eval for namespace {ns}, PPV {ppv}, simple scaler {} found queue {q}", scaler.deployment);
if q > 0 || is_namespace_running(ns, &guard.clusters) {
let pods = scaler.run(ns, q, &guard.clusters);
for (k, num) in &pods {
let replicas = scaler.run(ns, q, &guard.clusters);
for (k, num) in &replicas {
AUTOSCALER_METRICS.jobs
[&(scaler.pod_name_prefix.clone(), k.clone(), ns.clone())]
[&(scaler.deployment.clone(), k.clone(), ns.clone())]
.set(*num as u64);
}
// TODO: diff and add into scale_requests.
scaler.diff(ns, replicas, &guard.clusters, &mut scale_requests);
}
}
}
Expand Down
9 changes: 8 additions & 1 deletion prover/crates/bin/prover_autoscaler/src/global/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,12 @@ pub fn check_is_ready(v: &Vec<bool>) -> Result<()> {
pub struct Watcher {
/// List of base URLs of all agents.
pub cluster_agents: Vec<Arc<Url>>,
pub dry_run: bool,
pub data: Arc<Mutex<WatchedData>>,
}

impl Watcher {
pub fn new(agent_urls: Vec<String>) -> Self {
pub fn new(agent_urls: Vec<String>, dry_run: bool) -> Self {
let size = agent_urls.len();
Self {
cluster_agents: agent_urls
Expand All @@ -54,6 +55,7 @@ impl Watcher {
)
})
.collect(),
dry_run,
data: Arc::new(Mutex::new(WatchedData {
clusters: Clusters::default(),
is_ready: vec![false; size],
Expand All @@ -80,6 +82,7 @@ impl Watcher {
.collect();
}

let dry_run = self.dry_run;
let handles: Vec<_> = id_requests
.into_iter()
.map(|(id, sr)| {
Expand All @@ -92,6 +95,10 @@ impl Watcher {
tokio::spawn(async move {
let mut headers = HeaderMap::new();
headers.insert(CONTENT_TYPE, HeaderValue::from_static("application/json"));
if dry_run {
tracing::info!("Dry-run mode, not sending the request.");
return Ok((id, Ok(ScaleResponse::default())));
}
let response = send_request_with_retries(
&url,
MAX_RETRIES,
Expand Down
3 changes: 2 additions & 1 deletion prover/crates/bin/prover_autoscaler/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ async fn main() -> anyhow::Result<()> {
let interval = scaler_config.scaler_run_interval.unsigned_abs();
let exporter_config = PrometheusExporterConfig::pull(scaler_config.prometheus_port);
tasks.push(tokio::spawn(exporter_config.run(stop_receiver.clone())));
let watcher = global::watcher::Watcher::new(scaler_config.agents.clone());
let watcher =
global::watcher::Watcher::new(scaler_config.agents.clone(), scaler_config.dry_run);
let queuer = global::queuer::Queuer::new(scaler_config.prover_job_monitor_url.clone());
let scaler = global::scaler::Scaler::new(watcher.clone(), queuer, scaler_config);
tasks.extend(get_tasks(watcher, scaler, interval, stop_receiver)?);
Expand Down

0 comments on commit 767c5bc

Please sign in to comment.